From d396d451727d5886610846fa3273c1afe55e62bf Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 14 Dec 2020 18:31:19 -0800 Subject: [PATCH 1/6] Make the server matching code consistent with the phone code Add additional checks for the end_ts. If this doesn't work, we should change both. The client code has been tested more extensively in real world conditions than the server code, so picking that preferentially --- emission/storage/decorations/trip_queries.py | 55 +++++++++++++------ .../tests/storageTests/TestTripQueries.py | 1 + .../tests/storageTests/analysis_ts_common.py | 2 + 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/emission/storage/decorations/trip_queries.py b/emission/storage/decorations/trip_queries.py index 8dfbc1be0..157e1f6bb 100644 --- a/emission/storage/decorations/trip_queries.py +++ b/emission/storage/decorations/trip_queries.py @@ -7,6 +7,7 @@ from builtins import * import logging import pymongo +import arrow import emission.storage.timeseries.timequery as estt @@ -75,22 +76,48 @@ def get_user_input_for_trip(trip_key, user_id, trip_id, user_input_key): trip_obj = ts.get_entry_from_id(trip_key, trip_id) return get_user_input_for_trip_object(ts, trip_obj, user_input_key) +# Additional checks to be consistent with the phone code +# www/js/diary/services.js +# Since that has been tested the most +# If we no longer need these checks (maybe with trip editing), we can remove them +def valid_user_input(trip_obj): + def curried(user_input): + # we know that the trip is cleaned so we can use the fmt_time + # but the confirm objects are not necessarily filled out + fmt_ts = lambda ts, tz: arrow.get(ts).to(tz) + logging.debug("Comparing user input %s -> %s, trip %s -> %s, checks are (%s) && (%s) || (%s)" % ( + fmt_ts(user_input.data.start_ts, user_input.metadata.time_zone), + fmt_ts(user_input.data.end_ts, user_input.metadata.time_zone), + trip_obj.data.start_fmt_time, trip_obj.data.end_fmt_time, + (user_input.data.start_ts >= trip_obj.data.start_ts), + (user_input.data.end_ts <= trip_obj.data.end_ts), + ((user_input.data.end_ts - trip_obj.data.end_ts) <= 5 * 60) + )) + return (user_input.data.start_ts >= trip_obj.data.start_ts and + (user_input.data.end_ts <= trip_obj.data.end_ts or + ((user_input.data.end_ts - trip_obj.data.end_ts) <= 5 * 60))) + return curried + +def final_candidate(trip_obj, potential_candidates): + potential_candidate_objects = [ecwe.Entry(c) for c in potential_candidates] + extra_filtered_potential_candidates = list(filter(valid_user_input(trip_obj), potential_candidate_objects)) + if len(extra_filtered_potential_candidates) == 0: + return None + + sorted_pc = sorted(extra_filtered_potential_candidates, key=lambda c:c["metadata"]["write_ts"]) + most_recent_entry = extra_filtered_potential_candidates[-1] + logging.debug("most recent entry has id %s" % most_recent_entry) + logging.debug("and is mapped to entry %s" % most_recent_entry) + return most_recent_entry + def get_user_input_for_trip_object(ts, trip_obj, user_input_key): tq = estt.TimeQuery("data.start_ts", trip_obj.data.start_ts, trip_obj.data.end_ts) # In general, all candiates will have the same start_ts, so no point in # sorting by it. Only exception to general rule is when user first provides # input before the pipeline is run, and then overwrites after pipeline is # run - potential_candidates = ts.get_data_df(user_input_key, tq) - if len(potential_candidates) == 0: - return None - - sorted_pc = potential_candidates.sort_values(by="metadata_write_ts") - most_recent_entry_id = potential_candidates._id.iloc[-1] - logging.debug("most recent entry has id %s" % most_recent_entry_id) - ret_val = ts.get_entry_from_id(user_input_key, most_recent_entry_id) - logging.debug("and is mapped to entry %s" % ret_val) - return ret_val + potential_candidates = ts.find_entries([user_input_key], tq) + return final_candidate(trip_obj, potential_candidates) # This is almost an exact copy of get_user_input_for_trip_object, but it # retrieves an interable instead of a dataframe. So almost everything is @@ -100,10 +127,4 @@ def get_user_input_for_trip_object(ts, trip_obj, user_input_key): def get_user_input_from_cache_series(user_id, trip_obj, user_input_key): tq = estt.TimeQuery("data.start_ts", trip_obj.data.start_ts, trip_obj.data.end_ts) potential_candidates = estsc.find_entries(user_id, [user_input_key], tq) - if len(potential_candidates) == 0: - return None - sorted_pc = sorted(potential_candidates, key=lambda c:c["metadata"]["write_ts"]) - most_recent_entry = potential_candidates[-1] - logging.debug("most recent entry has id %s" % most_recent_entry["_id"]) - logging.debug("and is mapped to entry %s" % most_recent_entry) - return ecwe.Entry(most_recent_entry) + return final_candidate(trip_obj, potential_candidates) diff --git a/emission/tests/storageTests/TestTripQueries.py b/emission/tests/storageTests/TestTripQueries.py index 4a8ad487a..c006e3efe 100644 --- a/emission/tests/storageTests/TestTripQueries.py +++ b/emission/tests/storageTests/TestTripQueries.py @@ -40,6 +40,7 @@ def setUp(self): def tearDown(self): edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId}) + edb.get_timeseries_db().delete_many({'user_id': self.testUserId}) edb.get_usercache_db().delete_many({'user_id': self.testUserId}) def create_fake_trip(self): diff --git a/emission/tests/storageTests/analysis_ts_common.py b/emission/tests/storageTests/analysis_ts_common.py index 4af048bb6..65fa0d0a4 100644 --- a/emission/tests/storageTests/analysis_ts_common.py +++ b/emission/tests/storageTests/analysis_ts_common.py @@ -21,7 +21,9 @@ def createNewTripLike(utest, key, wrapper): new_trip = wrapper() new_trip.start_ts = 5 + new_trip.start_fmt_time = "5 secs" new_trip.end_ts = 6 + new_trip.end_fmt_time = "6 secs" new_trip_id = esta.TimeSeries.get_time_series(utest.testUserId).insert_data( utest.testUserId, key, new_trip) new_trip_entry = esta.TimeSeries.get_time_series(utest.testUserId).get_entry_from_id( From 6c4890edfbfdac48e72ba75d469a07ec683940c6 Mon Sep 17 00:00:00 2001 From: Shankari Date: Tue, 15 Dec 2020 14:58:35 -0800 Subject: [PATCH 2/6] Add the new wrapper classes for the confirmed trip and section objects Fairly straightforward change, can be used as an example of how to add a new analysis class that doesn't need the formatters to be created. --- emission/core/wrapper/confirmedsection.py | 30 +++++++++++++++++++ emission/core/wrapper/confirmedtrip.py | 25 ++++++++++++++++ emission/core/wrapper/entry.py | 4 +++ emission/core/wrapper/pipelinestate.py | 2 ++ .../storage/timeseries/builtin_timeseries.py | 4 ++- 5 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 emission/core/wrapper/confirmedsection.py create mode 100644 emission/core/wrapper/confirmedtrip.py diff --git a/emission/core/wrapper/confirmedsection.py b/emission/core/wrapper/confirmedsection.py new file mode 100644 index 000000000..97195219d --- /dev/null +++ b/emission/core/wrapper/confirmedsection.py @@ -0,0 +1,30 @@ +from __future__ import unicode_literals +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import +from future import standard_library +standard_library.install_aliases() +from builtins import * +import emission.core.wrapper.cleanedsection as ecwc +import emission.core.wrapper.wrapperbase as ecwb +import emission.core.wrapper.modeprediction as ecwm + +class Confirmedsection(ecwc.Cleanedsection): + props = ecwc.Cleanedsection.props + # for a detailed explanation of what the various modes represent, + # see https://github.com/e-mission/e-mission-docs/issues/476 + props.update({"cleaned_section": ecwb.WrapperBase.Access.WORM, + "inferred_section": ecwb.WrapperBase.WORM, + "inferred_mode": ecwb.WrapperBase.WORM, # inferred by mode inference algo + "confirmed_mode": ecwb.WrapperBase.WORM, # confirmed by user +# mode to be used for analysis; confirmed mode if we know factors for it, inferred mode otherwise + "analysis_mode": ecwb.WrapperBase.WORM, +# mode for user display; inferred mode if not confirmed; confirmed mode otherwise + "display_mode": ecwb.WrapperBase.WORM + }) + + enums = {"inferred_mode": ecwm.PredictedModeTypes, + "analysis_mode": ecwm.PredictedModeTypes} + + def _populateDependencies(self): + super(ecwc.Confirmedsection, self)._populateDependencies() diff --git a/emission/core/wrapper/confirmedtrip.py b/emission/core/wrapper/confirmedtrip.py new file mode 100644 index 000000000..3d6357e93 --- /dev/null +++ b/emission/core/wrapper/confirmedtrip.py @@ -0,0 +1,25 @@ +from __future__ import unicode_literals +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import +from future import standard_library +standard_library.install_aliases() +from builtins import * +import emission.core.wrapper.trip as ecwt +import emission.core.wrapper.wrapperbase as ecwb + +class Confirmedtrip(ecwt.Trip): + props = ecwt.Trip.props + props.update({"raw_trip": ecwb.WrapperBase.Access.WORM, + "cleaned_trip": ecwb.WrapperBase.Access.WORM, +# the confirmed section that is the "primary" +# https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-738120752 + "primary_section": ecwb.WrapperBase.Access.WORM, + "inferred_primary_mode": ecwb.WrapperBase.Access.WORM, +# the user input will have all `manual/*` entries +# let's make that be somewhat flexible instead of hardcoding into the data model + "user_input": ecwb.WrapperBase.Access.WORM + }) + + def _populateDependencies(self): + super(Confirmedtrip, self)._populateDependencies() diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index 40a64eab5..0986ed657 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -127,6 +127,10 @@ def _getData2Wrapper(): # running the inference step "analysis/inferred_section": "inferredsection", ### ** END: prediction objects + ### ** BEGIN: confirmed objects which combine inferred and user input values + "analysis/confirmed_trip": "confirmedtrip", + "analysis/confirmed_section": "confirmedsection" + ### ** END: confirmed objects which combine inferred and user input values } @staticmethod diff --git a/emission/core/wrapper/pipelinestate.py b/emission/core/wrapper/pipelinestate.py index ae2de6f69..3f978569b 100644 --- a/emission/core/wrapper/pipelinestate.py +++ b/emission/core/wrapper/pipelinestate.py @@ -11,12 +11,14 @@ class PipelineStages(enum.Enum): USERCACHE = 0 + USER_INPUT_MATCH_INCOMING = 12 ACCURACY_FILTERING = 6 TRIP_SEGMENTATION = 1 SECTION_SEGMENTATION = 2 JUMP_SMOOTHING = 3 CLEAN_RESAMPLING = 11 MODE_INFERENCE = 4 + CREATE_CONFIRMED_OBJECTS = 13 TOUR_MODEL = 5 ALTERNATIVES = 10 USER_MODEL = 7 diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index b8c0b82c3..be36ac1ce 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -81,7 +81,9 @@ def __init__(self, user_id): "metrics/daily_user_median_speed": self.analysis_timeseries_db, "metrics/daily_mean_median_speed": self.analysis_timeseries_db, "inference/prediction": self.analysis_timeseries_db, - "analysis/inferred_section": self.analysis_timeseries_db + "analysis/inferred_section": self.analysis_timeseries_db, + "analysis/confirmed_trip": self.analysis_timeseries_db, + "analysis/confirmed_section": self.analysis_timeseries_db } From 0630fab56497d8504a6e21c0f950e3956a67ea34 Mon Sep 17 00:00:00 2001 From: Shankari Date: Tue, 15 Dec 2020 15:05:35 -0800 Subject: [PATCH 3/6] Create pipeline steps for confirmed trips + fully populate the post matching step Concrete changes: - implement a post matching step as follows - add a configurable user input key list to the analysis configuration - for each entry in the list, find the matching entry - add all the labels from the matching entries into a single dict - create a confirmed_trip object by copying the cleaned_trip - create a user_input field with those labels - add the appropriate pipeline state manipulation functions - extend the pipepline to call pre and post matching steps. The pre step is currently a NOP Minor fixes to the matching code: - use `find_entries` for both the timeseries and usercache calls so we can unify the code - return the most recent entry by write timestamp instead of by data ts since the trip data values can differ between draft and cleaned trips New test for this which loads the user inputs, runs the pipeline and compares the confirmed trips Testing done: New test harness passes --- conf/analysis/debug.conf.json.sample | 3 +- emission/analysis/userinput/__init__.py | 0 emission/analysis/userinput/matcher.py | 66 ++ emission/pipeline/intake_stage.py | 16 + emission/storage/decorations/trip_queries.py | 18 +- emission/storage/pipeline_queries.py | 34 +- .../userInputTests/TestUserInput.py | 149 +++ emission/tests/common.py | 6 +- ...ankari_2016-06-20.expected_confirmed_trips | 500 ++++++++++ .../shankari_2016-06-20.user_inputs | 920 ++++++++++++++++++ 10 files changed, 1699 insertions(+), 13 deletions(-) create mode 100644 emission/analysis/userinput/__init__.py create mode 100644 emission/analysis/userinput/matcher.py create mode 100644 emission/tests/analysisTests/userInputTests/TestUserInput.py create mode 100644 emission/tests/data/real_examples/shankari_2016-06-20.expected_confirmed_trips create mode 100644 emission/tests/data/real_examples/shankari_2016-06-20.user_inputs diff --git a/conf/analysis/debug.conf.json.sample b/conf/analysis/debug.conf.json.sample index 7724ce44f..0021220f9 100644 --- a/conf/analysis/debug.conf.json.sample +++ b/conf/analysis/debug.conf.json.sample @@ -7,5 +7,6 @@ "classification.inference.mode.useBusTrainFeatureIndices": true, "classification.validityAssertions": true, "output.conversion.validityAssertions": true, - "analysis.result.section.key": "analysis/inferred_section" + "analysis.result.section.key": "analysis/inferred_section", + "userinput.keylist": ["manual/mode_confirm", "manual/purpose_confirm"] } diff --git a/emission/analysis/userinput/__init__.py b/emission/analysis/userinput/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/emission/analysis/userinput/matcher.py b/emission/analysis/userinput/matcher.py new file mode 100644 index 000000000..081f48d6d --- /dev/null +++ b/emission/analysis/userinput/matcher.py @@ -0,0 +1,66 @@ +import logging +import copy + +# Get the configuration for the classifier +import emission.analysis.config as eac + +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.storage.decorations.trip_queries as esdt +import emission.storage.pipeline_queries as epq +import emission.core.wrapper.entry as ecwe + +def match_incoming_user_inputs(user_id): + time_query = epq.get_time_range_for_incoming_userinput_match(user_id) + print("this is currently a no-op") + epq.mark_incoming_userinput_match_done(user_id, None) + +def create_confirmed_objects(user_id): + time_query = epq.get_time_range_for_confirmed_object_creation(user_id) + try: + last_cleaned_trip_done = create_confirmed_trips(user_id, time_query) + if last_cleaned_trip_done is None: + logging.debug("after run, last_cleaned_trip_done == None, must be early return") + epq.mark_confirmed_object_creation_done(user_id, None) + else: + epq.mark_confirmed_object_creation_done(user_id, last_cleaned_trip_done.data.end_ts) + except: + logging.exception("Error while creating confirmed objects, timestamp is unchanged") + epq.mark_confirmed_object_creation_failed(user_id) + +def create_confirmed_trips(user_id, timerange): + ts = esta.TimeSeries.get_time_series(user_id) + toConfirmTrips = esda.get_entries(esda.CLEANED_TRIP_KEY, user_id, + time_query=timerange) + logging.debug("Converting %d cleaned trips to confirmed ones" % len(toConfirmTrips)) + lastTripProcessed = None + if len(toConfirmTrips) == 0: + logging.debug("len(toConfirmTrips) == 0, early return") + return None + input_key_list = eac.get_config()["userinput.keylist"] + for tct in toConfirmTrips: + # Copy the trip and fill in the new values + confirmed_trip_dict = copy.copy(tct) + del confirmed_trip_dict["_id"] + confirmed_trip_dict["metadata"]["key"] = "analysis/confirmed_trip" + confirmed_trip_dict["data"]["cleaned_trip"] = tct.get_id() + confirmed_trip_dict["data"]["user_input"] = \ + get_user_input_dict(ts, tct, input_key_list) + confirmed_trip_entry = ecwe.Entry(confirmed_trip_dict) + # save the entry + ts.insert(confirmed_trip_entry) + # if everything is successful, then update the last successful trip + lastTripProcessed = tct + + return lastTripProcessed + +def get_user_input_dict(ts, tct, input_key_list): + tct_userinput = {} + for ikey in input_key_list: + matched_userinput = esdt.get_user_input_for_trip_object(ts, tct, ikey) + if matched_userinput is not None: + ikey_name = ikey.split("/")[1] + tct_userinput[ikey_name] = matched_userinput.data.label + logging.debug("for trip %s, returning user input dict %s" % (tct.get_id(), tct_userinput)) + return tct_userinput + diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index c4e818f18..9657fb2fe 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -23,6 +23,7 @@ import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.timeseries.aggregate_timeseries as estag +import emission.analysis.userinput.matcher as eaum import emission.analysis.intake.cleaning.filter_accuracy as eaicf import emission.analysis.intake.segmentation.trip_segmentation as eaist import emission.analysis.intake.segmentation.section_segmentation as eaiss @@ -88,6 +89,13 @@ def run_intake_pipeline_for_user(uuid): esds.store_pipeline_time(uuid, ecwp.PipelineStages.USERCACHE.name, time.time(), uct.elapsed) + with ect.Timer() as uit: + logging.info("*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10) + eaum.match_incoming_user_inputs(uuid) + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.name, + time.time(), uct.elapsed) # Hack until we delete these spurious entries # https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868 @@ -150,6 +158,14 @@ def run_intake_pipeline_for_user(uuid): esds.store_pipeline_time(uuid, ecwp.PipelineStages.MODE_INFERENCE.name, time.time(), crt.elapsed) + with ect.Timer() as crt: + logging.info("*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) + eaum.create_confirmed_objects(uuid) + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS.name, + time.time(), crt.elapsed) + with ect.Timer() as ogt: logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) diff --git a/emission/storage/decorations/trip_queries.py b/emission/storage/decorations/trip_queries.py index 157e1f6bb..0467317e5 100644 --- a/emission/storage/decorations/trip_queries.py +++ b/emission/storage/decorations/trip_queries.py @@ -85,7 +85,8 @@ def curried(user_input): # we know that the trip is cleaned so we can use the fmt_time # but the confirm objects are not necessarily filled out fmt_ts = lambda ts, tz: arrow.get(ts).to(tz) - logging.debug("Comparing user input %s -> %s, trip %s -> %s, checks are (%s) && (%s) || (%s)" % ( + logging.debug("Comparing user input %s: %s -> %s, trip %s -> %s, checks are (%s) && (%s) || (%s)" % ( + user_input.data.label, fmt_ts(user_input.data.start_ts, user_input.metadata.time_zone), fmt_ts(user_input.data.end_ts, user_input.metadata.time_zone), trip_obj.data.start_fmt_time, trip_obj.data.end_fmt_time, @@ -104,18 +105,19 @@ def final_candidate(trip_obj, potential_candidates): if len(extra_filtered_potential_candidates) == 0: return None + # In general, all candiates will have the same start_ts, so no point in + # sorting by it. Only exception to general rule is when user first provides + # input before the pipeline is run, and then overwrites after pipeline is + # run sorted_pc = sorted(extra_filtered_potential_candidates, key=lambda c:c["metadata"]["write_ts"]) - most_recent_entry = extra_filtered_potential_candidates[-1] - logging.debug("most recent entry has id %s" % most_recent_entry) - logging.debug("and is mapped to entry %s" % most_recent_entry) + logging.debug("sorted candidates are %s" % [(c.metadata.write_fmt_time, c.data.label) for c in sorted_pc]) + most_recent_entry = sorted_pc[-1] + logging.debug("most recent entry is %s, %s" % + (most_recent_entry.metadata.write_fmt_time, most_recent_entry.data.label)) return most_recent_entry def get_user_input_for_trip_object(ts, trip_obj, user_input_key): tq = estt.TimeQuery("data.start_ts", trip_obj.data.start_ts, trip_obj.data.end_ts) - # In general, all candiates will have the same start_ts, so no point in - # sorting by it. Only exception to general rule is when user first provides - # input before the pipeline is run, and then overwrites after pipeline is - # run potential_candidates = ts.find_entries([user_input_key], tq) return final_candidate(trip_obj, potential_candidates) diff --git a/emission/storage/pipeline_queries.py b/emission/storage/pipeline_queries.py index 978aa4f88..56183282d 100644 --- a/emission/storage/pipeline_queries.py +++ b/emission/storage/pipeline_queries.py @@ -38,6 +38,19 @@ def mark_accuracy_filtering_done(user_id, last_processed_ts): def mark_accuracy_filtering_failed(user_id): mark_stage_failed(user_id, ps.PipelineStages.ACCURACY_FILTERING) +def get_time_range_for_incoming_userinput_match(user_id): + return get_time_range_for_stage(user_id, ps.PipelineStages.USER_INPUT_MATCH_INCOMING) + +def mark_incoming_userinput_match_done(user_id, last_processed_ts): + if last_processed_ts is None: + mark_stage_done(user_id, ps.PipelineStages.USER_INPUT_MATCH_INCOMING, None) + else: + mark_stage_done(user_id, ps.PipelineStages.USER_INPUT_MATCH_INCOMING, + last_processed_ts + END_FUZZ_AVOID_LTE) + +def mark_incoming_userinput_match_failed(user_id): + mark_stage_failed(user_id, ps.PipelineStages.USER_INPUT_MATCH_INCOMING) + def get_time_range_for_segmentation(user_id): return get_time_range_for_stage(user_id, ps.PipelineStages.TRIP_SEGMENTATION) @@ -108,10 +121,25 @@ def mark_mode_inference_complete(user_id): def mark_mode_inference_failed(user_id): mark_stage_failed(user_id, ps.PipelineStages.MODE_INFERENCE) +def get_time_range_for_confirmed_object_creation(user_id): + tq = get_time_range_for_stage(user_id, ps.PipelineStages.CREATE_CONFIRMED_OBJECTS) + tq.timeType = "data.end_ts" + return tq + +def mark_confirmed_object_creation_done(user_id, last_processed_ts): + if last_processed_ts is None: + mark_stage_done(user_id, ps.PipelineStages.CREATE_CONFIRMED_OBJECTS, None) + else: + mark_stage_done(user_id, ps.PipelineStages.CREATE_CONFIRMED_OBJECTS, + last_processed_ts + END_FUZZ_AVOID_LTE) + +def mark_confirmed_object_creation_failed(user_id): + mark_stage_failed(user_id, ps.PipelineStages.CREATE_CONFIRMED_OBJECTS) + def get_complete_ts(user_id): - mode_infer_state = get_current_state(user_id, ps.PipelineStages.MODE_INFERENCE) - if mode_infer_state is not None: - return mode_infer_state.last_processed_ts + create_confirmed_state = get_current_state(user_id, ps.PipelineStages.CREATE_CONFIRMED_OBJECTS) + if create_confirmed_state is not None: + return create_confirmed_state.last_processed_ts else: cleaned_state = get_current_state(user_id, ps.PipelineStages.CLEAN_RESAMPLING) if cleaned_state is not None: diff --git a/emission/tests/analysisTests/userInputTests/TestUserInput.py b/emission/tests/analysisTests/userInputTests/TestUserInput.py new file mode 100644 index 000000000..a4b2938e7 --- /dev/null +++ b/emission/tests/analysisTests/userInputTests/TestUserInput.py @@ -0,0 +1,149 @@ +import unittest +import logging +import json +import bson.json_util as bju +import argparse +import numpy as np + +# Our imports +import emission.core.get_database as edb +import emission.core.wrapper.localdate as ecwl +import emission.core.wrapper.entry as ecwe +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.timeseries.tcquery as estc + +import emission.net.usercache.abstract_usercache_handler as enuah +import emission.analysis.plotting.geojson.geojson_feature_converter as gfc +import emission.storage.timeseries.tcquery as estt +import emission.core.common as ecc + +# Test imports +import emission.tests.common as etc + +class TestUserInput(unittest.TestCase): + def setUp(self): + # Thanks to M&J for the number! + np.random.seed(61297777) + logging.info("setUp complete") + + def tearDown(self): + logging.debug("Clearing related databases for %s" % self.testUUID) + # Clear the database only if it is not an evaluation run + # A testing run validates that nothing has changed + # An evaluation run compares to different algorithm implementations + # to determine whether to switch to a new implementation + if not hasattr(self, "evaluation") or not self.evaluation: + self.clearRelatedDb() + if hasattr(self, "analysis_conf_path"): + os.remove(self.analysis_conf_path) + logging.info("tearDown complete") + + def clearRelatedDb(self): + logging.info("Timeseries delete result %s" % edb.get_timeseries_db().delete_many({"user_id": self.testUUID}).raw_result) + logging.info("Analysis delete result %s" % edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID}).raw_result) + logging.info("Usercache delete result %s" % edb.get_usercache_db().delete_many({"user_id": self.testUUID}).raw_result) + + def compare_trip_result(self, result_dicts, expect_dicts): + # This is basically a bunch of asserts to ensure that the timeline is as + # expected. We are not using a recursive diff because things like the IDs + # will change from run to run. Instead, I pick out a bunch of important + # things that are highly user visible + # Since this is deterministic, we can also include things that are not that user visible :) + result = [ecwe.Entry(r) for r in result_dicts] + expect = [ecwe.Entry(e) for e in expect_dicts] + + for rt, et in zip(result, expect): + logging.debug("Comparing %s -> %s with %s -> %s" % + (rt.data.start_fmt_time, rt.data.end_fmt_time, + et.data.start_fmt_time, et.data.end_fmt_time)) + self.assertEqual(len(result), len(expect)) + for rt, et in zip(result, expect): + logging.debug("======= Comparing trip =========") + logging.debug(json.dumps(rt, indent=4, default=bju.default)) + logging.debug(json.dumps(et, indent=4, default=bju.default)) + # Highly user visible + self.assertEqual(rt.data["user_input"], et.data["user_input"]) + # self.assertEqual(rt.data.inferred_primary_mode, et.data.inferred_primary_mode) + logging.debug(20 * "=") + + def compare_section_result(self, result, expect): + # This is basically a bunch of asserts to ensure that the timeline is as + # expected. We are not using a recursive diff because things like the IDs + # will change from run to run. Instead, I pick out a bunch of important + # things that are highly user visible + # Since this is deterministic, we can also include things that are not that user visible :) + + for rt, et in zip(result, expect): + logging.debug("Comparing %s -> %s with %s -> %s" % + (rt.start_fmt_time, rt.end_fmt_time, + et.start_fmt_time, et.end_fmt_time)) + self.assertEqual(len(result), len(expect)) + for rt, et in zip(result, expect): + logging.debug("======= Comparing section =========") + # Highly user visible + self.assertEqual(rt.inferred_mode, et.inferred_mode) + self.assertEqual(rt.confirmed_mode, et.confirmed_mode) + self.assertEqual(rt.analysis_mode, et.analysis_mode) + self.assertEqual(rt.display_mode, et.display_mode) + logging.debug(20 * "=") + + def checkConfirmedTripsAndSections(self, dataFile, ld, preload=False): + with open(dataFile+".ground_truth") as gfp: + ground_truth = json.load(gfp, object_hook=bju.object_hook) + + etc.setupRealExample(self, dataFile) + if (preload): + self.entries = json.load(open(dataFile+".user_inputs"), object_hook = bju.object_hook) + etc.setupRealExampleWithEntries(self) + etc.runIntakePipeline(self.testUUID) + if (not preload): + self.entries = json.load(open(dataFile+".user_inputs"), object_hook = bju.object_hook) + etc.setupRealExampleWithEntries(self) + etc.runIntakePipeline(self.testUUID) + ts = esta.TimeSeries.get_time_series(self.testUUID) + confirmed_trips = list(ts.find_entries(["analysis/confirmed_trip"], None)) + with open(dataFile+".expected_confirmed_trips") as dect: + expected_confirmed_trips = json.load(dect, object_hook = bju.object_hook) + self.compare_trip_result(confirmed_trips, expected_confirmed_trips) + +# confirmed_sections = ts.find_entries(["analysis/confirmed_section"], +# estc.TimeComponentQuery("data.local_dt", ld, ld)) +# with open(dataFile+".expected_confirmed_sections") as dect: +# expected_confirmed_sections = json.load(dect, object_hook = bju.object_hook) +# self.compare_section_result(confirmed_sections, expected_confirmed_sections) + + + def testJun20Preload(self): + # Tests matching where user input is stored before the pipeline is run + # - trips with a single match and an exact start/end match (easy case, user input on cleaned trip) + # ---- Trip to karate + # - trips with a single match and an start/end match after the cleaned start/end (user input on draft trip) + # ---- First trip to library + # - trips with multiple matches (pick most recent) + # ---- Trip from karate (in draft mode) + # ---- Trip back from library in the afternoon (in final mode) + # - user input with no matching trip (should be ignored) + # - trips with no matches + # ---- Trip back from library in the morning + # ---- Trip to library in the afternoon + # - trip that was first set in draft mode and then overriden in cleaned mode + # ---- Trip to karate + dataFile = "emission/tests/data/real_examples/shankari_2016-06-20" + ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20}) + self.checkConfirmedTripsAndSections(dataFile, ld, preload=True) + +# def testJun20Postload(self): +# # Same as testJun20Preload, except that the user input arrives after the +# # pipeline is run for the first time, and the matching happens on the +# # next pipeline run +# dataFile = "emission/tests/data/real_examples/shankari_2016-06-20" +# ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20}) +# self.checkConfirmedTripsAndSections(dataFile, ld, preload=False) + +if __name__ == '__main__': + etc.configLogging() + + parser = argparse.ArgumentParser() + parser.add_argument("--algo_change", + help="modifications to the algorithm", action="store_true") + unittest.main() diff --git a/emission/tests/common.py b/emission/tests/common.py index 9006b544d..00ba988dd 100644 --- a/emission/tests/common.py +++ b/emission/tests/common.py @@ -119,7 +119,8 @@ def createAndFillUUID(testObj): testObj.testUUID = uuid.uuid4() def setupRealExample(testObj, dump_file): - logging.info("Before loading, timeseries db size = %s" % edb.get_timeseries_db().estimated_document_count()) + logging.info("Before loading from %s, timeseries db size = %s" % + (dump_file, edb.get_timeseries_db().estimated_document_count())) with open(dump_file) as dfp: testObj.entries = json.load(dfp, object_hook = bju.object_hook) createAndFillUUID(testObj) @@ -163,6 +164,7 @@ def setupIncomingEntries(): def runIntakePipeline(uuid): # Move these imports here so that we don't inadvertently load the modules, # and any related config modules, before we want to + import emission.analysis.userinput.matcher as eaum import emission.analysis.intake.cleaning.filter_accuracy as eaicf import emission.storage.timeseries.format_hacks.move_filter_field as estfm import emission.analysis.intake.segmentation.trip_segmentation as eaist @@ -171,12 +173,14 @@ def runIntakePipeline(uuid): import emission.analysis.intake.cleaning.clean_and_resample as eaicr import emission.analysis.classification.inference.mode.pipeline as eacimp + eaum.match_incoming_user_inputs(uuid) eaicf.filter_accuracy(uuid) eaist.segment_current_trips(uuid) eaiss.segment_current_sections(uuid) eaicl.filter_current_sections(uuid) eaicr.clean_and_resample(uuid) eacimp.predict_mode(uuid) + eaum.create_confirmed_objects(uuid) def configLogging(): """ diff --git a/emission/tests/data/real_examples/shankari_2016-06-20.expected_confirmed_trips b/emission/tests/data/real_examples/shankari_2016-06-20.expected_confirmed_trips new file mode 100644 index 000000000..704899b1e --- /dev/null +++ b/emission/tests/data/real_examples/shankari_2016-06-20.expected_confirmed_trips @@ -0,0 +1,500 @@ +[ + { + "_id": { + "$oid": "5fd8e69ac61669a9ebad0241" + }, + "user_id": { + "$uuid": "aa9fdec92944446c8ee250d79b3044d3" + }, + "metadata": { + "key": "analysis/confirmed_trip", + "platform": "server", + "write_ts": 1608050275.276295, + "time_zone": "America/Los_Angeles", + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 15, + "hour": 8, + "minute": 37, + "second": 55, + "weekday": 1, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-15T08:37:55.276295-08:00" + }, + "data": { + "source": "DwellSegmentationTimeFilter", + "end_ts": 1466437275.856, + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 8, + "minute": 41, + "second": 15, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T08:41:15.856000-07:00", + "end_loc": { + "type": "Point", + "coordinates": [ + -122.0826931, + 37.3914184 + ] + }, + "raw_trip": { + "$oid": "5fd8e662baff4ef23d349789" + }, + "start_ts": 1466436483.395, + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 8, + "minute": 28, + "second": 3, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T08:28:03.395000-07:00", + "start_loc": { + "type": "Point", + "coordinates": [ + -122.0857861, + 37.3898049 + ] + }, + "duration": 792.4609999656677, + "distance": 1047.1630675866315, + "start_place": { + "$oid": "5fd8e664baff4ef23d349860" + }, + "end_place": { + "$oid": "5fd8e664baff4ef23d349861" + }, + "cleaned_trip": { + "$oid": "5fd8e663baff4ef23d3497af" + }, + "user_input": { + "mode_confirm": "walk", + "purpose_confirm": "library" + } + } + }, + { + "_id": { + "$oid": "5fd8e69ac61669a9ebad0242" + }, + "user_id": { + "$uuid": "aa9fdec92944446c8ee250d79b3044d3" + }, + "metadata": { + "key": "analysis/confirmed_trip", + "platform": "server", + "write_ts": 1608050275.488737, + "time_zone": "America/Los_Angeles", + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 15, + "hour": 8, + "minute": 37, + "second": 55, + "weekday": 1, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-15T08:37:55.488737-08:00" + }, + "data": { + "source": "DwellSegmentationTimeFilter", + "end_ts": 1466438022.959, + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 8, + "minute": 53, + "second": 42, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T08:53:42.959000-07:00", + "end_loc": { + "type": "Point", + "coordinates": [ + -122.0866181, + 37.3910231 + ] + }, + "raw_trip": { + "$oid": "5fd8e662baff4ef23d34978b" + }, + "start_ts": 1466437438.6453953, + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 8, + "minute": 43, + "second": 58, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T08:43:58.645395-07:00", + "start_loc": { + "type": "Point", + "coordinates": [ + -122.0826931, + 37.3914184 + ] + }, + "duration": 584.3136048316956, + "distance": 886.4937093667857, + "start_place": { + "$oid": "5fd8e664baff4ef23d349861" + }, + "end_place": { + "$oid": "5fd8e664baff4ef23d349862" + }, + "cleaned_trip": { + "$oid": "5fd8e663baff4ef23d3497ce" + }, + "user_input": {} + } + }, + { + "_id": { + "$oid": "5fd8e69ac61669a9ebad0243" + }, + "user_id": { + "$uuid": "aa9fdec92944446c8ee250d79b3044d3" + }, + "metadata": { + "key": "analysis/confirmed_trip", + "platform": "server", + "write_ts": 1608050275.7204192, + "time_zone": "America/Los_Angeles", + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 15, + "hour": 8, + "minute": 37, + "second": 55, + "weekday": 1, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-15T08:37:55.720419-08:00" + }, + "data": { + "source": "DwellSegmentationTimeFilter", + "end_ts": 1466461966.379, + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 32, + "second": 46, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T15:32:46.379000-07:00", + "end_loc": { + "type": "Point", + "coordinates": [ + -122.0830016, + 37.3901637 + ] + }, + "raw_trip": { + "$oid": "5fd8e662baff4ef23d34978d" + }, + "start_ts": 1466461623.1195338, + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 27, + "second": 3, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:27:03.119534-07:00", + "start_loc": { + "type": "Point", + "coordinates": [ + -122.0866181, + 37.3910231 + ] + }, + "duration": 343.25946617126465, + "distance": 610.2234223038181, + "start_place": { + "$oid": "5fd8e664baff4ef23d349862" + }, + "end_place": { + "$oid": "5fd8e664baff4ef23d349863" + }, + "cleaned_trip": { + "$oid": "5fd8e663baff4ef23d3497e6" + }, + "user_input": {} + } + }, + { + "_id": { + "$oid": "5fd8e69ac61669a9ebad0244" + }, + "user_id": { + "$uuid": "aa9fdec92944446c8ee250d79b3044d3" + }, + "metadata": { + "key": "analysis/confirmed_trip", + "platform": "server", + "write_ts": 1608050275.942955, + "time_zone": "America/Los_Angeles", + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 15, + "hour": 8, + "minute": 37, + "second": 55, + "weekday": 1, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-15T08:37:55.942955-08:00" + }, + "data": { + "source": "DwellSegmentationTimeFilter", + "end_ts": 1466462452.708, + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 40, + "second": 52, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T15:40:52.708000-07:00", + "end_loc": { + "type": "Point", + "coordinates": [ + -122.086605, + 37.3910011 + ] + }, + "raw_trip": { + "$oid": "5fd8e662baff4ef23d34978f" + }, + "start_ts": 1466462052.158904, + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 34, + "second": 12, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:34:12.158904-07:00", + "start_loc": { + "type": "Point", + "coordinates": [ + -122.0830016, + 37.3901637 + ] + }, + "duration": 400.54909586906433, + "distance": 405.97685486691756, + "start_place": { + "$oid": "5fd8e664baff4ef23d349863" + }, + "end_place": { + "$oid": "5fd8e664baff4ef23d349864" + }, + "cleaned_trip": { + "$oid": "5fd8e663baff4ef23d3497f6" + }, + "user_input": { + "mode_confirm": "walk", + "purpose_confirm": "home" + } + } + }, + { + "_id": { + "$oid": "5fd8e69ac61669a9ebad0245" + }, + "user_id": { + "$uuid": "aa9fdec92944446c8ee250d79b3044d3" + }, + "metadata": { + "key": "analysis/confirmed_trip", + "platform": "server", + "write_ts": 1608050276.1554408, + "time_zone": "America/Los_Angeles", + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 15, + "hour": 8, + "minute": 37, + "second": 56, + "weekday": 1, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-15T08:37:56.155441-08:00" + }, + "data": { + "source": "DwellSegmentationTimeFilter", + "end_ts": 1466463835.713, + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 3, + "second": 55, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T16:03:55.713000-07:00", + "end_loc": { + "type": "Point", + "coordinates": [ + -122.1081974, + 37.4168828 + ] + }, + "raw_trip": { + "$oid": "5fd8e662baff4ef23d349791" + }, + "start_ts": 1466462970.2807262, + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 49, + "second": 30, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:49:30.280726-07:00", + "start_loc": { + "type": "Point", + "coordinates": [ + -122.086605, + 37.3910011 + ] + }, + "duration": 865.4322738647461, + "distance": 4521.417177464177, + "start_place": { + "$oid": "5fd8e664baff4ef23d349864" + }, + "end_place": { + "$oid": "5fd8e664baff4ef23d349865" + }, + "cleaned_trip": { + "$oid": "5fd8e664baff4ef23d349808" + }, + "user_input": { + "mode_confirm": "shared_ride", + "purpose_confirm": "karate" + } + } + }, + { + "_id": { + "$oid": "5fd8e69ac61669a9ebad0246" + }, + "user_id": { + "$uuid": "aa9fdec92944446c8ee250d79b3044d3" + }, + "metadata": { + "key": "analysis/confirmed_trip", + "platform": "server", + "write_ts": 1608050276.546149, + "time_zone": "America/Los_Angeles", + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 15, + "hour": 8, + "minute": 37, + "second": 56, + "weekday": 1, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-15T08:37:56.546149-08:00" + }, + "data": { + "source": "DwellSegmentationTimeFilter", + "end_ts": 1466467959.767, + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 17, + "minute": 12, + "second": 39, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T17:12:39.767000-07:00", + "end_loc": { + "type": "Point", + "coordinates": [ + -122.0864051, + 37.3907649 + ] + }, + "raw_trip": { + "$oid": "5fd8e662baff4ef23d349795" + }, + "start_ts": 1466466584.0461695, + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 49, + "second": 44, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T16:49:44.046170-07:00", + "start_loc": { + "type": "Point", + "coordinates": [ + -122.1081974, + 37.4168828 + ] + }, + "duration": 1375.7208304405212, + "distance": 5136.824369981995, + "start_place": { + "$oid": "5fd8e664baff4ef23d349865" + }, + "end_place": { + "$oid": "5fd8e664baff4ef23d349867" + }, + "cleaned_trip": { + "$oid": "5fd8e664baff4ef23d34982c" + }, + "user_input": { + "mode_confirm": "shared_ride", + "purpose_confirm": "home" + } + } + } +] \ No newline at end of file diff --git a/emission/tests/data/real_examples/shankari_2016-06-20.user_inputs b/emission/tests/data/real_examples/shankari_2016-06-20.user_inputs new file mode 100644 index 000000000..b3a7fd7c9 --- /dev/null +++ b/emission/tests/data/real_examples/shankari_2016-06-20.user_inputs @@ -0,0 +1,920 @@ +[ + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f29" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/mode_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607970514.069, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 10, + "minute": 28, + "second": 34, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T10:28:34.069000-08:00" + }, + "data": { + "start_ts": 1466436506.53, + "end_ts": 1466437492.486, + "label": "walk", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 8, + "minute": 28, + "second": 26, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T08:28:26.530000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 8, + "minute": 44, + "second": 52, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T08:44:52.486000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f2d" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/mode_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607970712.122, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 10, + "minute": 31, + "second": 52, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T10:31:52.122000-08:00" + }, + "data": { + "start_ts": 1466466680.964, + "end_ts": 1466468087.626, + "label": "shared_ride", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 51, + "second": 20, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T16:51:20.964000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 17, + "minute": 14, + "second": 47, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T17:14:47.626000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f35" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/mode_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607970727.311, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 10, + "minute": 32, + "second": 7, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T10:32:07.311000-08:00" + }, + "data": { + "start_ts": 1466466680.964, + "end_ts": 1466468087.626, + "label": "drove_alone", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 51, + "second": 20, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T16:51:20.964000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 17, + "minute": 14, + "second": 47, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T17:14:47.626000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f37" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/mode_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607970729.493, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 10, + "minute": 32, + "second": 9, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T10:32:09.493000-08:00" + }, + "data": { + "start_ts": 1466466680.964, + "end_ts": 1466468087.626, + "label": "shared_ride", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 51, + "second": 20, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T16:51:20.964000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 17, + "minute": 14, + "second": 47, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T17:14:47.626000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f39" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/mode_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607972439.757, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 11, + "minute": 0, + "second": 39, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T11:00:39.757000-08:00" + }, + "data": { + "start_ts": 1466462052.158904, + "end_ts": 1466462452.708, + "label": "free_shuttle", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 34, + "second": 12, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:34:12.158904-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 40, + "second": 52, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T15:40:52.708000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f3f" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/mode_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607972461.225, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 11, + "minute": 1, + "second": 1, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T11:01:01.225000-08:00" + }, + "data": { + "start_ts": 1466462052.158904, + "end_ts": 1466462452.708, + "label": "bus", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 34, + "second": 12, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:34:12.158904-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 40, + "second": 52, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T15:40:52.708000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f41" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/mode_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607972464.372, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 11, + "minute": 1, + "second": 4, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T11:01:04.372000-08:00" + }, + "data": { + "start_ts": 1466462052.158904, + "end_ts": 1466462452.708, + "label": "walk", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 34, + "second": 12, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:34:12.158904-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 40, + "second": 52, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T15:40:52.708000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f43" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/mode_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607972631.351, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 11, + "minute": 3, + "second": 51, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T11:03:51.351000-08:00" + }, + "data": { + "start_ts": 1466463028.216, + "end_ts": 1466464013.985, + "label": "taxi", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 50, + "second": 28, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:50:28.216000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 6, + "second": 53, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T16:06:53.985000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f47" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/mode_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607972700.068, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 11, + "minute": 5, + "second": 0, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T11:05:00.068000-08:00" + }, + "data": { + "start_ts": 1466462970.2807262, + "end_ts": 1466463835.713, + "label": "shared_ride", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 49, + "second": 30, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:49:30.280726-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 3, + "second": 55, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T16:03:55.713000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f2b" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/purpose_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607970529.123, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 10, + "minute": 28, + "second": 49, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T10:28:49.123000-08:00" + }, + "data": { + "start_ts": 1466436506.53, + "end_ts": 1466437492.486, + "label": "library", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 8, + "minute": 28, + "second": 26, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T08:28:26.530000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 8, + "minute": 44, + "second": 52, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T08:44:52.486000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f2f" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/purpose_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607970716.731, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 10, + "minute": 31, + "second": 56, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T10:31:56.731000-08:00" + }, + "data": { + "start_ts": 1466466680.964, + "end_ts": 1466468087.626, + "label": "home", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 51, + "second": 20, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T16:51:20.964000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 17, + "minute": 14, + "second": 47, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T17:14:47.626000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f31" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/purpose_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607970720.79, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 10, + "minute": 32, + "second": 0, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T10:32:00.790000-08:00" + }, + "data": { + "start_ts": 1466466680.964, + "end_ts": 1466468087.626, + "label": "shopping", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 51, + "second": 20, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T16:51:20.964000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 17, + "minute": 14, + "second": 47, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T17:14:47.626000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f33" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/purpose_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607970724.14, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 10, + "minute": 32, + "second": 4, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T10:32:04.140000-08:00" + }, + "data": { + "start_ts": 1466466680.964, + "end_ts": 1466468087.626, + "label": "home", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 51, + "second": 20, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T16:51:20.964000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 17, + "minute": 14, + "second": 47, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T17:14:47.626000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f3b" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/purpose_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607972445.489, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 11, + "minute": 0, + "second": 45, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T11:00:45.489000-08:00" + }, + "data": { + "start_ts": 1466462052.158904, + "end_ts": 1466462452.708, + "label": "transit_transfer", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 34, + "second": 12, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:34:12.158904-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 40, + "second": 52, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T15:40:52.708000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f3d" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/purpose_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607972448.716, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 11, + "minute": 0, + "second": 48, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T11:00:48.716000-08:00" + }, + "data": { + "start_ts": 1466462052.158904, + "end_ts": 1466462452.708, + "label": "home", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 34, + "second": 12, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:34:12.158904-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 40, + "second": 52, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T15:40:52.708000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f45" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/purpose_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607972639.249, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 11, + "minute": 3, + "second": 59, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T11:03:59.249000-08:00" + }, + "data": { + "start_ts": 1466463028.216, + "end_ts": 1466464013.985, + "label": "religious", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 50, + "second": 28, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:50:28.216000-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 6, + "second": 53, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T16:06:53.985000-07:00" + } + }, + { + "_id": { + "$oid": "5fd7b7bdf181cd8feb045f49" + }, + "user_id": { + "$uuid": "1114befce2774cf89b9506dcbff3a52d" + }, + "metadata": { + "key": "manual/purpose_confirm", + "platform": "android", + "read_ts": 0, + "time_zone": "America/Los_Angeles", + "type": "message", + "write_ts": 1607972709.426, + "write_local_dt": { + "year": 2020, + "month": 12, + "day": 14, + "hour": 11, + "minute": 5, + "second": 9, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "write_fmt_time": "2020-12-14T11:05:09.426000-08:00" + }, + "data": { + "start_ts": 1466462970.2807262, + "end_ts": 1466463835.713, + "label": "karate", + "start_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 15, + "minute": 49, + "second": 30, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "start_fmt_time": "2016-06-20T15:49:30.280726-07:00", + "end_local_dt": { + "year": 2016, + "month": 6, + "day": 20, + "hour": 16, + "minute": 3, + "second": 55, + "weekday": 0, + "timezone": "America/Los_Angeles" + }, + "end_fmt_time": "2016-06-20T16:03:55.713000-07:00" + } + } +] From b7e0dc8072dbf44abe2ace6f6845f701b5c11d68 Mon Sep 17 00:00:00 2001 From: Shankari Date: Tue, 15 Dec 2020 16:05:24 -0800 Subject: [PATCH 4/6] Ensure that we have a label in all the test user label entries So that the log statement in the matching code works --- emission/tests/storageTests/TestTripQueries.py | 1 + 1 file changed, 1 insertion(+) diff --git a/emission/tests/storageTests/TestTripQueries.py b/emission/tests/storageTests/TestTripQueries.py index c006e3efe..5ccdcc41f 100644 --- a/emission/tests/storageTests/TestTripQueries.py +++ b/emission/tests/storageTests/TestTripQueries.py @@ -113,6 +113,7 @@ def testUserInputForTripOneInput(self): new_mc = ecul.Userlabel() new_mc["start_ts"] = new_trip.data.start_ts + 1 new_mc["end_ts"] = new_trip.data.end_ts + 1 + new_mc["label"] = "pogo_sticking" ts = esta.TimeSeries.get_time_series(self.testUserId) ts.insert_data(self.testUserId, MODE_CONFIRM_KEY, new_mc) From 986a5340f3c38cb3e3c9d2f9c3c7cc287efab14c Mon Sep 17 00:00:00 2001 From: Shankari Date: Thu, 17 Dec 2020 13:19:04 -0800 Subject: [PATCH 5/6] Significant improvements to the trip matching + Fix the existing TestTripQueries for real data by passing in the correct trip_id instead of the first entry every time + this exposed several flaws, fix them by expanding the check. The corresponding UI fixes are in https://github.com/e-mission/e-mission-phone/commit/75129dbbe4e7d85e7e263f550ec96b3e078c6c01 - Bump up the trip end buffer to 15 minutes since the time threshold default for the distance filter is 10 minutes https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-747115640 - Expand the trip end check to handle the corner case, where if the user input end is significantly after the trip end because of weird sensing issues https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-747222181 + This expansion generated a regression in which a spurious (not a trip) that occurred after the real trip fit the criteria for a match. And since it was confirmed after the real trip, as you would expect while going down the trip list, it was matched preferentially. https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-747587041 Fixed by checking the degree of overlap and rejecting too short matches. Corresponding UI Fixes are in https://github.com/e-mission/e-mission-phone/commit/9438799f8466f5939922a576055570d4ab00b16c + Implemented a function to find the matching trip given a user input - Generalized the validity checks to take both a trip and a user input - Created two curried wrapper functions: one which took a trip and the other which took a userinput - Generalized the final_candidate function to take in the curried function and invoke it, and fix the entry detail logging to match - Create a new function that retrieves all trips within a one day window in each direction based on the user input start timestamp + Added a new test to check the new function - The test loads the data - Runs the pipeline - Loads the user inputs - Finds the trip matching each user input - Note that there can be duplicate entries for each trip as users override their prior inputs, so we may have duplicate matches, as in the case with the purpose confirm objects. Testing done: User input related tests pass --- emission/storage/decorations/trip_queries.py | 107 ++++++++++++++---- .../tests/storageTests/TestTripQueries.py | 54 ++++++++- 2 files changed, 136 insertions(+), 25 deletions(-) diff --git a/emission/storage/decorations/trip_queries.py b/emission/storage/decorations/trip_queries.py index 0467317e5..d56445b96 100644 --- a/emission/storage/decorations/trip_queries.py +++ b/emission/storage/decorations/trip_queries.py @@ -71,6 +71,17 @@ def get_stops_for_trip(key, user_id, trip_id): "data.enter_ts", pymongo.ASCENDING) return [ecwe.Entry(doc) for doc in stop_doc_cursor] +def _get_next_cleaned_trip(ts, trip_obj): + """ + Find the next trip in the timeline + """ + next_place = ts.get_entry_from_id(esda.CLEANED_PLACE_KEY, trip_obj.data.end_place) + if next_place is None: + return None + else: + next_trip = ts.get_entry_from_id(esda.CLEANED_TRIP_KEY, next_place.data.starting_trip) + return next_trip + def get_user_input_for_trip(trip_key, user_id, trip_id, user_input_key): ts = esta.TimeSeries.get_time_series(user_id) trip_obj = ts.get_entry_from_id(trip_key, trip_id) @@ -80,28 +91,51 @@ def get_user_input_for_trip(trip_key, user_id, trip_id, user_input_key): # www/js/diary/services.js # Since that has been tested the most # If we no longer need these checks (maybe with trip editing), we can remove them -def valid_user_input(trip_obj): +def valid_user_input_for_trip(ts, trip_obj, user_input): + # we know that the trip is cleaned so we can use the fmt_time + # but the confirm objects are not necessarily filled out + fmt_ts = lambda ts, tz: arrow.get(ts).to(tz) + logging.debug("Comparing user input %s: %s -> %s, trip %s -> %s, start checks are (%s && %s) and end checks are (%s || %s)" % ( + user_input.data.label, + fmt_ts(user_input.data.start_ts, user_input.metadata.time_zone), + fmt_ts(user_input.data.end_ts, user_input.metadata.time_zone), + trip_obj.data.start_fmt_time, trip_obj.data.end_fmt_time, + (user_input.data.start_ts >= trip_obj.data.start_ts), + (user_input.data.start_ts <= trip_obj.data.end_ts), + (user_input.data.end_ts <= trip_obj.data.end_ts), + ((user_input.data.end_ts - trip_obj.data.end_ts) <= 15 * 60) + )) + start_checks = (user_input.data.start_ts >= trip_obj.data.start_ts and + user_input.data.start_ts >= trip_obj.data.start_ts) + end_checks = (user_input.data.end_ts <= trip_obj.data.end_ts or + ((user_input.data.end_ts - trip_obj.data.end_ts) <= 15 * 60)) + if start_checks and not end_checks: + logging.debug("Handling corner case where start check matches, but end check does not") + next_trip_obj = _get_next_cleaned_trip(ts, trip_obj) + if next_trip_obj is not None: + end_checks = user_input.data.end_ts <= next_trip_obj.data.start_ts + logging.debug("Second level of end checks when the next trip is defined (%s <= %s) = %s" % ( + user_input.data.end_ts, next_trip_obj.data.start_ts, end_checks)) + else: + end_checks = True + logging.debug("Second level of end checks when the next trip is not defined = %s" % end_checks) + if end_checks: + # If we have flipped the values, check to see that there is sufficient overlap + # https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-747587041 + overlapDuration = min(user_input.data.end_ts, trip_obj.data.end_ts) - max(user_input.data.start_ts, trip_obj.data.start_ts) + logging.debug("Flipped endCheck, overlap(%s)/trip(%s) = %s" % + (overlapDuration, trip_obj.data.duration, (overlapDuration / trip_obj.data.duration))); + end_checks = (overlapDuration/trip_obj.data.duration) > 0.5; + return start_checks and end_checks + +def valid_user_input(ts, trip_obj): def curried(user_input): - # we know that the trip is cleaned so we can use the fmt_time - # but the confirm objects are not necessarily filled out - fmt_ts = lambda ts, tz: arrow.get(ts).to(tz) - logging.debug("Comparing user input %s: %s -> %s, trip %s -> %s, checks are (%s) && (%s) || (%s)" % ( - user_input.data.label, - fmt_ts(user_input.data.start_ts, user_input.metadata.time_zone), - fmt_ts(user_input.data.end_ts, user_input.metadata.time_zone), - trip_obj.data.start_fmt_time, trip_obj.data.end_fmt_time, - (user_input.data.start_ts >= trip_obj.data.start_ts), - (user_input.data.end_ts <= trip_obj.data.end_ts), - ((user_input.data.end_ts - trip_obj.data.end_ts) <= 5 * 60) - )) - return (user_input.data.start_ts >= trip_obj.data.start_ts and - (user_input.data.end_ts <= trip_obj.data.end_ts or - ((user_input.data.end_ts - trip_obj.data.end_ts) <= 5 * 60))) + return valid_user_input_for_trip(ts, trip_obj, user_input) return curried -def final_candidate(trip_obj, potential_candidates): +def final_candidate(filter_fn, potential_candidates): potential_candidate_objects = [ecwe.Entry(c) for c in potential_candidates] - extra_filtered_potential_candidates = list(filter(valid_user_input(trip_obj), potential_candidate_objects)) + extra_filtered_potential_candidates = list(filter(filter_fn, potential_candidate_objects)) if len(extra_filtered_potential_candidates) == 0: return None @@ -110,16 +144,18 @@ def final_candidate(trip_obj, potential_candidates): # input before the pipeline is run, and then overwrites after pipeline is # run sorted_pc = sorted(extra_filtered_potential_candidates, key=lambda c:c["metadata"]["write_ts"]) - logging.debug("sorted candidates are %s" % [(c.metadata.write_fmt_time, c.data.label) for c in sorted_pc]) + entry_detail = lambda c: c.data.label if "label" in c.data else c.data.start_fmt_time + logging.debug("sorted candidates are %s" % + [(c.metadata.write_fmt_time, entry_detail(c)) for c in sorted_pc]) most_recent_entry = sorted_pc[-1] - logging.debug("most recent entry is %s, %s" % - (most_recent_entry.metadata.write_fmt_time, most_recent_entry.data.label)) + logging.debug("most recent entry is %s, %s" % + (most_recent_entry.metadata.write_fmt_time, entry_detail(most_recent_entry))) return most_recent_entry def get_user_input_for_trip_object(ts, trip_obj, user_input_key): tq = estt.TimeQuery("data.start_ts", trip_obj.data.start_ts, trip_obj.data.end_ts) potential_candidates = ts.find_entries([user_input_key], tq) - return final_candidate(trip_obj, potential_candidates) + return final_candidate(valid_user_input(ts, trip_obj), potential_candidates) # This is almost an exact copy of get_user_input_for_trip_object, but it # retrieves an interable instead of a dataframe. So almost everything is @@ -128,5 +164,30 @@ def get_user_input_for_trip_object(ts, trip_obj, user_input_key): def get_user_input_from_cache_series(user_id, trip_obj, user_input_key): tq = estt.TimeQuery("data.start_ts", trip_obj.data.start_ts, trip_obj.data.end_ts) + ts = esta.TimeSeries.get_time_series(user_id) potential_candidates = estsc.find_entries(user_id, [user_input_key], tq) - return final_candidate(trip_obj, potential_candidates) + return final_candidate(valid_user_input(ts, trip_obj), potential_candidates) + +def valid_trip(ts, user_input): + def curried(trip_obj): + return valid_user_input_for_trip(ts, trip_obj, user_input) + return curried + +def get_trip_for_user_input_obj(ts, ui_obj): + # the match check that we have is: + # user input can start after trip start + # user input can end before trip end OR user input is within 5 mins of trip end + # Given those considerations, there is no principled query for trip data + # that fits into our query model + # the trip start is before the user input start, but that can go until eternity + # and the trip end can be either before or after the user input end + # we know that the trip end is after the user input start, but again, that + # can go on until now. + # As a workaround, let us assume that the trip start is no more than a day + # before the start of the ui object, which seems like a fairly conservative + # assumption + ONE_DAY = 24 * 60 * 60 + tq = estt.TimeQuery("data.start_ts", ui_obj.data.start_ts - ONE_DAY, + ui_obj.data.start_ts + ONE_DAY) + potential_candidates = ts.find_entries(["analysis/confirmed_trip"], tq) + return final_candidate(valid_trip(ts, ui_obj), potential_candidates) diff --git a/emission/tests/storageTests/TestTripQueries.py b/emission/tests/storageTests/TestTripQueries.py index 5ccdcc41f..a905e1a07 100644 --- a/emission/tests/storageTests/TestTripQueries.py +++ b/emission/tests/storageTests/TestTripQueries.py @@ -13,6 +13,7 @@ import json import bson.json_util as bju import numpy as np +import copy # Our imports import emission.storage.decorations.trip_queries as esdt @@ -224,16 +225,65 @@ def testUserInputRealData(self): pc_label_list = [] for trip_id in ct_df._id: mc = esdt.get_user_input_for_trip(esda.CLEANED_TRIP_KEY, - self.testUserId, ct_df._id[0], "manual/mode_confirm") + self.testUserId, trip_id, "manual/mode_confirm") mc_label_list.append(mc.data.label) pc = esdt.get_user_input_for_trip(esda.CLEANED_TRIP_KEY, - self.testUserId, ct_df._id[0], "manual/purpose_confirm") + self.testUserId, trip_id, "manual/purpose_confirm") pc_label_list.append(pc.data.label) self.assertEqual(mc_label_list, 4 * ['bike']) self.assertEqual(pc_label_list, 4 * ['pick_drop']) + def testUserInputRealDataPostArrival(self): + np.random.seed(61297777) + dataFile = "emission/tests/data/real_examples/shankari_single_positional_indexer.dec-12" + etc.setupRealExample(self, dataFile) + self.testUserId = self.testUUID + # At this point, we have only raw data, no trips + etc.runIntakePipeline(self.testUUID) + # At this point, we have trips + + # Let's retrieve them + ts = esta.TimeSeries.get_time_series(self.testUUID) + ct_df = ts.get_data_df("analysis/confirmed_trip", time_query=None) + self.assertEqual(len(ct_df), 4) + mode_fmt_times = list(ct_df.start_fmt_time) + # corresponds to the walk not a trip + # https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-747115640) + mode_fmt_times.insert(3, None) + purpose_fmt_times = copy.copy(mode_fmt_times) + # corresponds to overrides for the same trip + # they are correctly matched to the same trip + # in the final pipeline step, will override the same entry multiple times + purpose_fmt_times.insert(3, purpose_fmt_times[1]) + purpose_fmt_times.insert(4, purpose_fmt_times[0]) + print("expected_fmt_times: mode = %s" % mode_fmt_times) + print("expected_fmt_times: purpose = %s" % purpose_fmt_times) + + # Now, let's load the mode_confirm and purpose_confirm objects + with open("emission/tests/data/real_examples/shankari_single_positional_indexer.dec-12.mode_confirm") as mcfp: + mode_confirm_list = [ecwe.Entry(mc) for mc in json.load(mcfp, object_hook=bju.object_hook)] + self.assertEqual(len(mode_confirm_list), 5) + + with open("emission/tests/data/real_examples/shankari_single_positional_indexer.dec-12.purpose_confirm") as pcfp: + purpose_confirm_list = [ecwe.Entry(pc) for pc in json.load(pcfp, object_hook=bju.object_hook)] + self.assertEqual(len(purpose_confirm_list), 7) + + mc_trip_start_fmt_time_list = [] + pc_trip_start_fmt_time_list = [] + for mode in mode_confirm_list: + mc_trip = esdt.get_trip_for_user_input_obj(ts, mode) + mc_trip_start_fmt_time_list.append(mc_trip.data.start_fmt_time if mc_trip is not None else None) + + for purpose in purpose_confirm_list: + pc_trip = esdt.get_trip_for_user_input_obj(ts, purpose) + print("Found pc_trip %s" % pc_trip.data.start_fmt_time if pc_trip is not None else None) + pc_trip_start_fmt_time_list.append(pc_trip.data.start_fmt_time if pc_trip is not None else None) + + self.assertEqual(mc_trip_start_fmt_time_list, mode_fmt_times) + self.assertEqual(pc_trip_start_fmt_time_list, purpose_fmt_times) + if __name__ == '__main__': import emission.tests.common as etc etc.configLogging() From ccb98e963e51c6cf9a2d7e26fffc593a48608c8a Mon Sep 17 00:00:00 2001 From: Shankari Date: Thu, 17 Dec 2020 22:44:21 -0800 Subject: [PATCH 6/6] Implement the initial pipeline state to match incoming user inputs with existing confirmed trips + remove the no-op and create our familiar pipeline architecture where we track the last processed input + implement the code to find the matching trip, update its user_input field, and store it back + minor fixes to existing code to support this - pull out the metadata key -> name function so it can be used for both matches - add a new key for confirmed trips + add unit test which tests the same inputs but with the user input being post-loaded Testing done: - All tests pass --- emission/analysis/userinput/matcher.py | 39 +++++++++++++++++-- .../analysis_timeseries_queries.py | 1 + emission/storage/decorations/trip_queries.py | 2 +- .../userInputTests/TestUserInput.py | 14 +++---- 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/emission/analysis/userinput/matcher.py b/emission/analysis/userinput/matcher.py index 081f48d6d..df4875a1f 100644 --- a/emission/analysis/userinput/matcher.py +++ b/emission/analysis/userinput/matcher.py @@ -10,10 +10,43 @@ import emission.storage.pipeline_queries as epq import emission.core.wrapper.entry as ecwe +obj_to_dict_key = lambda key: key.split("/")[1] + def match_incoming_user_inputs(user_id): time_query = epq.get_time_range_for_incoming_userinput_match(user_id) - print("this is currently a no-op") - epq.mark_incoming_userinput_match_done(user_id, None) + try: + last_user_input_done = match_incoming_inputs(user_id, time_query) + if last_user_input_done is None: + logging.debug("after run, last_user_input_done == None, must be early return") + epq.mark_incoming_userinput_match_done(user_id, None) + else: + epq.mark_incoming_userinput_match_done(user_id, last_user_input_done.metadata.write_ts) + except: + logging.exception("Error while matching incoming user inputs, timestamp is unchanged") + epq.mark_incoming_userinput_match_failed(user_id) + +def match_incoming_inputs(user_id, timerange): + ts = esta.TimeSeries.get_time_series(user_id) + input_key_list = eac.get_config()["userinput.keylist"] + toMatchInputs = [ecwe.Entry(e) for e in ts.find_entries(input_key_list, time_query=timerange)] + logging.debug("Matching %d inputs to trips" % len(toMatchInputs)) + lastInputProcessed = None + if len(toMatchInputs) == 0: + logging.debug("len(toMatchInputs) == 0, early return") + return None + for ui in toMatchInputs: + confirmed_trip = esdt.get_trip_for_user_input_obj(ts, ui) + if confirmed_trip is not None: + input_name = obj_to_dict_key(ui.metadata.key) + confirmed_trip["data"]["user_input"][input_name] = ui.data.label + import emission.storage.timeseries.builtin_timeseries as estbt + estbt.BuiltinTimeSeries.update(confirmed_trip) + else: + logging.warn("No match found for user input %s, moving forward anyway" % ui) + lastInputProcessed = ui + + return lastInputProcessed + def create_confirmed_objects(user_id): time_query = epq.get_time_range_for_confirmed_object_creation(user_id) @@ -59,7 +92,7 @@ def get_user_input_dict(ts, tct, input_key_list): for ikey in input_key_list: matched_userinput = esdt.get_user_input_for_trip_object(ts, tct, ikey) if matched_userinput is not None: - ikey_name = ikey.split("/")[1] + ikey_name = obj_to_dict_key(ikey) tct_userinput[ikey_name] = matched_userinput.data.label logging.debug("for trip %s, returning user input dict %s" % (tct.get_id(), tct_userinput)) return tct_userinput diff --git a/emission/storage/decorations/analysis_timeseries_queries.py b/emission/storage/decorations/analysis_timeseries_queries.py index d4ad9a073..c7def2790 100644 --- a/emission/storage/decorations/analysis_timeseries_queries.py +++ b/emission/storage/decorations/analysis_timeseries_queries.py @@ -25,6 +25,7 @@ CLEANED_STOP_KEY = "analysis/cleaned_stop" CLEANED_UNTRACKED_KEY = "analysis/cleaned_untracked" CLEANED_LOCATION_KEY = "analysis/recreated_location" +CONFIRMED_TRIP_KEY = "analysis/confirmed_trip" METRICS_DAILY_USER_COUNT = "metrics/daily_user_count" METRICS_DAILY_MEAN_COUNT = "metrics/daily_mean_count" METRICS_DAILY_USER_DISTANCE = "metrics/daily_user_distance" diff --git a/emission/storage/decorations/trip_queries.py b/emission/storage/decorations/trip_queries.py index d56445b96..9487b604a 100644 --- a/emission/storage/decorations/trip_queries.py +++ b/emission/storage/decorations/trip_queries.py @@ -146,7 +146,7 @@ def final_candidate(filter_fn, potential_candidates): sorted_pc = sorted(extra_filtered_potential_candidates, key=lambda c:c["metadata"]["write_ts"]) entry_detail = lambda c: c.data.label if "label" in c.data else c.data.start_fmt_time logging.debug("sorted candidates are %s" % - [(c.metadata.write_fmt_time, entry_detail(c)) for c in sorted_pc]) + [{"write_fmt_time": c.metadata.write_fmt_time, "detail": entry_detail(c)} for c in sorted_pc]) most_recent_entry = sorted_pc[-1] logging.debug("most recent entry is %s, %s" % (most_recent_entry.metadata.write_fmt_time, entry_detail(most_recent_entry))) diff --git a/emission/tests/analysisTests/userInputTests/TestUserInput.py b/emission/tests/analysisTests/userInputTests/TestUserInput.py index a4b2938e7..df0288614 100644 --- a/emission/tests/analysisTests/userInputTests/TestUserInput.py +++ b/emission/tests/analysisTests/userInputTests/TestUserInput.py @@ -132,13 +132,13 @@ def testJun20Preload(self): ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20}) self.checkConfirmedTripsAndSections(dataFile, ld, preload=True) -# def testJun20Postload(self): -# # Same as testJun20Preload, except that the user input arrives after the -# # pipeline is run for the first time, and the matching happens on the -# # next pipeline run -# dataFile = "emission/tests/data/real_examples/shankari_2016-06-20" -# ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20}) -# self.checkConfirmedTripsAndSections(dataFile, ld, preload=False) + def testJun20Postload(self): + # Same as testJun20Preload, except that the user input arrives after the + # pipeline is run for the first time, and the matching happens on the + # next pipeline run + dataFile = "emission/tests/data/real_examples/shankari_2016-06-20" + ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20}) + self.checkConfirmedTripsAndSections(dataFile, ld, preload=False) if __name__ == '__main__': etc.configLogging()