Skip to content

Commit

Permalink
Merge pull request #780 from shankari/create_confirmed_trips
Browse files Browse the repository at this point in the history
Make the server matching code consistent with the phone code
  • Loading branch information
shankari authored Dec 18, 2020
2 parents 1f8dd14 + ccb98e9 commit b309285
Show file tree
Hide file tree
Showing 18 changed files with 1,945 additions and 26 deletions.
3 changes: 2 additions & 1 deletion conf/analysis/debug.conf.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
"classification.inference.mode.useBusTrainFeatureIndices": true,
"classification.validityAssertions": true,
"output.conversion.validityAssertions": true,
"analysis.result.section.key": "analysis/inferred_section"
"analysis.result.section.key": "analysis/inferred_section",
"userinput.keylist": ["manual/mode_confirm", "manual/purpose_confirm"]
}
Empty file.
99 changes: 99 additions & 0 deletions emission/analysis/userinput/matcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import logging
import copy

# Get the configuration for the classifier
import emission.analysis.config as eac

import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.storage.decorations.trip_queries as esdt
import emission.storage.pipeline_queries as epq
import emission.core.wrapper.entry as ecwe

obj_to_dict_key = lambda key: key.split("/")[1]

def match_incoming_user_inputs(user_id):
time_query = epq.get_time_range_for_incoming_userinput_match(user_id)
try:
last_user_input_done = match_incoming_inputs(user_id, time_query)
if last_user_input_done is None:
logging.debug("after run, last_user_input_done == None, must be early return")
epq.mark_incoming_userinput_match_done(user_id, None)
else:
epq.mark_incoming_userinput_match_done(user_id, last_user_input_done.metadata.write_ts)
except:
logging.exception("Error while matching incoming user inputs, timestamp is unchanged")
epq.mark_incoming_userinput_match_failed(user_id)

def match_incoming_inputs(user_id, timerange):
ts = esta.TimeSeries.get_time_series(user_id)
input_key_list = eac.get_config()["userinput.keylist"]
toMatchInputs = [ecwe.Entry(e) for e in ts.find_entries(input_key_list, time_query=timerange)]
logging.debug("Matching %d inputs to trips" % len(toMatchInputs))
lastInputProcessed = None
if len(toMatchInputs) == 0:
logging.debug("len(toMatchInputs) == 0, early return")
return None
for ui in toMatchInputs:
confirmed_trip = esdt.get_trip_for_user_input_obj(ts, ui)
if confirmed_trip is not None:
input_name = obj_to_dict_key(ui.metadata.key)
confirmed_trip["data"]["user_input"][input_name] = ui.data.label
import emission.storage.timeseries.builtin_timeseries as estbt
estbt.BuiltinTimeSeries.update(confirmed_trip)
else:
logging.warn("No match found for user input %s, moving forward anyway" % ui)
lastInputProcessed = ui

return lastInputProcessed


def create_confirmed_objects(user_id):
time_query = epq.get_time_range_for_confirmed_object_creation(user_id)
try:
last_cleaned_trip_done = create_confirmed_trips(user_id, time_query)
if last_cleaned_trip_done is None:
logging.debug("after run, last_cleaned_trip_done == None, must be early return")
epq.mark_confirmed_object_creation_done(user_id, None)
else:
epq.mark_confirmed_object_creation_done(user_id, last_cleaned_trip_done.data.end_ts)
except:
logging.exception("Error while creating confirmed objects, timestamp is unchanged")
epq.mark_confirmed_object_creation_failed(user_id)

def create_confirmed_trips(user_id, timerange):
ts = esta.TimeSeries.get_time_series(user_id)
toConfirmTrips = esda.get_entries(esda.CLEANED_TRIP_KEY, user_id,
time_query=timerange)
logging.debug("Converting %d cleaned trips to confirmed ones" % len(toConfirmTrips))
lastTripProcessed = None
if len(toConfirmTrips) == 0:
logging.debug("len(toConfirmTrips) == 0, early return")
return None
input_key_list = eac.get_config()["userinput.keylist"]
for tct in toConfirmTrips:
# Copy the trip and fill in the new values
confirmed_trip_dict = copy.copy(tct)
del confirmed_trip_dict["_id"]
confirmed_trip_dict["metadata"]["key"] = "analysis/confirmed_trip"
confirmed_trip_dict["data"]["cleaned_trip"] = tct.get_id()
confirmed_trip_dict["data"]["user_input"] = \
get_user_input_dict(ts, tct, input_key_list)
confirmed_trip_entry = ecwe.Entry(confirmed_trip_dict)
# save the entry
ts.insert(confirmed_trip_entry)
# if everything is successful, then update the last successful trip
lastTripProcessed = tct

return lastTripProcessed

def get_user_input_dict(ts, tct, input_key_list):
tct_userinput = {}
for ikey in input_key_list:
matched_userinput = esdt.get_user_input_for_trip_object(ts, tct, ikey)
if matched_userinput is not None:
ikey_name = obj_to_dict_key(ikey)
tct_userinput[ikey_name] = matched_userinput.data.label
logging.debug("for trip %s, returning user input dict %s" % (tct.get_id(), tct_userinput))
return tct_userinput

30 changes: 30 additions & 0 deletions emission/core/wrapper/confirmedsection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
from builtins import *
import emission.core.wrapper.cleanedsection as ecwc
import emission.core.wrapper.wrapperbase as ecwb
import emission.core.wrapper.modeprediction as ecwm

class Confirmedsection(ecwc.Cleanedsection):
props = ecwc.Cleanedsection.props
# for a detailed explanation of what the various modes represent,
# see https://github.com/e-mission/e-mission-docs/issues/476
props.update({"cleaned_section": ecwb.WrapperBase.Access.WORM,
"inferred_section": ecwb.WrapperBase.WORM,
"inferred_mode": ecwb.WrapperBase.WORM, # inferred by mode inference algo
"confirmed_mode": ecwb.WrapperBase.WORM, # confirmed by user
# mode to be used for analysis; confirmed mode if we know factors for it, inferred mode otherwise
"analysis_mode": ecwb.WrapperBase.WORM,
# mode for user display; inferred mode if not confirmed; confirmed mode otherwise
"display_mode": ecwb.WrapperBase.WORM
})

enums = {"inferred_mode": ecwm.PredictedModeTypes,
"analysis_mode": ecwm.PredictedModeTypes}

def _populateDependencies(self):
super(ecwc.Confirmedsection, self)._populateDependencies()
25 changes: 25 additions & 0 deletions emission/core/wrapper/confirmedtrip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
from builtins import *
import emission.core.wrapper.trip as ecwt
import emission.core.wrapper.wrapperbase as ecwb

class Confirmedtrip(ecwt.Trip):
props = ecwt.Trip.props
props.update({"raw_trip": ecwb.WrapperBase.Access.WORM,
"cleaned_trip": ecwb.WrapperBase.Access.WORM,
# the confirmed section that is the "primary"
# https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-738120752
"primary_section": ecwb.WrapperBase.Access.WORM,
"inferred_primary_mode": ecwb.WrapperBase.Access.WORM,
# the user input will have all `manual/*` entries
# let's make that be somewhat flexible instead of hardcoding into the data model
"user_input": ecwb.WrapperBase.Access.WORM
})

def _populateDependencies(self):
super(Confirmedtrip, self)._populateDependencies()
4 changes: 4 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ def _getData2Wrapper():
# running the inference step
"analysis/inferred_section": "inferredsection",
### ** END: prediction objects
### ** BEGIN: confirmed objects which combine inferred and user input values
"analysis/confirmed_trip": "confirmedtrip",
"analysis/confirmed_section": "confirmedsection"
### ** END: confirmed objects which combine inferred and user input values
}

@staticmethod
Expand Down
2 changes: 2 additions & 0 deletions emission/core/wrapper/pipelinestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@

class PipelineStages(enum.Enum):
USERCACHE = 0
USER_INPUT_MATCH_INCOMING = 12
ACCURACY_FILTERING = 6
TRIP_SEGMENTATION = 1
SECTION_SEGMENTATION = 2
JUMP_SMOOTHING = 3
CLEAN_RESAMPLING = 11
MODE_INFERENCE = 4
CREATE_CONFIRMED_OBJECTS = 13
TOUR_MODEL = 5
ALTERNATIVES = 10
USER_MODEL = 7
Expand Down
16 changes: 16 additions & 0 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.timeseries.aggregate_timeseries as estag

import emission.analysis.userinput.matcher as eaum
import emission.analysis.intake.cleaning.filter_accuracy as eaicf
import emission.analysis.intake.segmentation.trip_segmentation as eaist
import emission.analysis.intake.segmentation.section_segmentation as eaiss
Expand Down Expand Up @@ -88,6 +89,13 @@ def run_intake_pipeline_for_user(uuid):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.USERCACHE.name,
time.time(), uct.elapsed)

with ect.Timer() as uit:
logging.info("*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10)
eaum.match_incoming_user_inputs(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.name,
time.time(), uct.elapsed)

# Hack until we delete these spurious entries
# https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868
Expand Down Expand Up @@ -150,6 +158,14 @@ def run_intake_pipeline_for_user(uuid):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.MODE_INFERENCE.name,
time.time(), crt.elapsed)

with ect.Timer() as crt:
logging.info("*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10)
eaum.create_confirmed_objects(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS.name,
time.time(), crt.elapsed)

with ect.Timer() as ogt:
logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
CLEANED_STOP_KEY = "analysis/cleaned_stop"
CLEANED_UNTRACKED_KEY = "analysis/cleaned_untracked"
CLEANED_LOCATION_KEY = "analysis/recreated_location"
CONFIRMED_TRIP_KEY = "analysis/confirmed_trip"
METRICS_DAILY_USER_COUNT = "metrics/daily_user_count"
METRICS_DAILY_MEAN_COUNT = "metrics/daily_mean_count"
METRICS_DAILY_USER_DISTANCE = "metrics/daily_user_distance"
Expand Down
120 changes: 102 additions & 18 deletions emission/storage/decorations/trip_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from builtins import *
import logging
import pymongo
import arrow

import emission.storage.timeseries.timequery as estt

Expand Down Expand Up @@ -70,27 +71,91 @@ def get_stops_for_trip(key, user_id, trip_id):
"data.enter_ts", pymongo.ASCENDING)
return [ecwe.Entry(doc) for doc in stop_doc_cursor]

def _get_next_cleaned_trip(ts, trip_obj):
"""
Find the next trip in the timeline
"""
next_place = ts.get_entry_from_id(esda.CLEANED_PLACE_KEY, trip_obj.data.end_place)
if next_place is None:
return None
else:
next_trip = ts.get_entry_from_id(esda.CLEANED_TRIP_KEY, next_place.data.starting_trip)
return next_trip

def get_user_input_for_trip(trip_key, user_id, trip_id, user_input_key):
ts = esta.TimeSeries.get_time_series(user_id)
trip_obj = ts.get_entry_from_id(trip_key, trip_id)
return get_user_input_for_trip_object(ts, trip_obj, user_input_key)

def get_user_input_for_trip_object(ts, trip_obj, user_input_key):
tq = estt.TimeQuery("data.start_ts", trip_obj.data.start_ts, trip_obj.data.end_ts)
# Additional checks to be consistent with the phone code
# www/js/diary/services.js
# Since that has been tested the most
# If we no longer need these checks (maybe with trip editing), we can remove them
def valid_user_input_for_trip(ts, trip_obj, user_input):
# we know that the trip is cleaned so we can use the fmt_time
# but the confirm objects are not necessarily filled out
fmt_ts = lambda ts, tz: arrow.get(ts).to(tz)
logging.debug("Comparing user input %s: %s -> %s, trip %s -> %s, start checks are (%s && %s) and end checks are (%s || %s)" % (
user_input.data.label,
fmt_ts(user_input.data.start_ts, user_input.metadata.time_zone),
fmt_ts(user_input.data.end_ts, user_input.metadata.time_zone),
trip_obj.data.start_fmt_time, trip_obj.data.end_fmt_time,
(user_input.data.start_ts >= trip_obj.data.start_ts),
(user_input.data.start_ts <= trip_obj.data.end_ts),
(user_input.data.end_ts <= trip_obj.data.end_ts),
((user_input.data.end_ts - trip_obj.data.end_ts) <= 15 * 60)
))
start_checks = (user_input.data.start_ts >= trip_obj.data.start_ts and
user_input.data.start_ts >= trip_obj.data.start_ts)
end_checks = (user_input.data.end_ts <= trip_obj.data.end_ts or
((user_input.data.end_ts - trip_obj.data.end_ts) <= 15 * 60))
if start_checks and not end_checks:
logging.debug("Handling corner case where start check matches, but end check does not")
next_trip_obj = _get_next_cleaned_trip(ts, trip_obj)
if next_trip_obj is not None:
end_checks = user_input.data.end_ts <= next_trip_obj.data.start_ts
logging.debug("Second level of end checks when the next trip is defined (%s <= %s) = %s" % (
user_input.data.end_ts, next_trip_obj.data.start_ts, end_checks))
else:
end_checks = True
logging.debug("Second level of end checks when the next trip is not defined = %s" % end_checks)
if end_checks:
# If we have flipped the values, check to see that there is sufficient overlap
# https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-747587041
overlapDuration = min(user_input.data.end_ts, trip_obj.data.end_ts) - max(user_input.data.start_ts, trip_obj.data.start_ts)
logging.debug("Flipped endCheck, overlap(%s)/trip(%s) = %s" %
(overlapDuration, trip_obj.data.duration, (overlapDuration / trip_obj.data.duration)));
end_checks = (overlapDuration/trip_obj.data.duration) > 0.5;
return start_checks and end_checks

def valid_user_input(ts, trip_obj):
def curried(user_input):
return valid_user_input_for_trip(ts, trip_obj, user_input)
return curried

def final_candidate(filter_fn, potential_candidates):
potential_candidate_objects = [ecwe.Entry(c) for c in potential_candidates]
extra_filtered_potential_candidates = list(filter(filter_fn, potential_candidate_objects))
if len(extra_filtered_potential_candidates) == 0:
return None

# In general, all candiates will have the same start_ts, so no point in
# sorting by it. Only exception to general rule is when user first provides
# input before the pipeline is run, and then overwrites after pipeline is
# run
potential_candidates = ts.get_data_df(user_input_key, tq)
if len(potential_candidates) == 0:
return None
sorted_pc = sorted(extra_filtered_potential_candidates, key=lambda c:c["metadata"]["write_ts"])
entry_detail = lambda c: c.data.label if "label" in c.data else c.data.start_fmt_time
logging.debug("sorted candidates are %s" %
[{"write_fmt_time": c.metadata.write_fmt_time, "detail": entry_detail(c)} for c in sorted_pc])
most_recent_entry = sorted_pc[-1]
logging.debug("most recent entry is %s, %s" %
(most_recent_entry.metadata.write_fmt_time, entry_detail(most_recent_entry)))
return most_recent_entry

sorted_pc = potential_candidates.sort_values(by="metadata_write_ts")
most_recent_entry_id = potential_candidates._id.iloc[-1]
logging.debug("most recent entry has id %s" % most_recent_entry_id)
ret_val = ts.get_entry_from_id(user_input_key, most_recent_entry_id)
logging.debug("and is mapped to entry %s" % ret_val)
return ret_val
def get_user_input_for_trip_object(ts, trip_obj, user_input_key):
tq = estt.TimeQuery("data.start_ts", trip_obj.data.start_ts, trip_obj.data.end_ts)
potential_candidates = ts.find_entries([user_input_key], tq)
return final_candidate(valid_user_input(ts, trip_obj), potential_candidates)

# This is almost an exact copy of get_user_input_for_trip_object, but it
# retrieves an interable instead of a dataframe. So almost everything is
Expand All @@ -99,11 +164,30 @@ def get_user_input_for_trip_object(ts, trip_obj, user_input_key):

def get_user_input_from_cache_series(user_id, trip_obj, user_input_key):
tq = estt.TimeQuery("data.start_ts", trip_obj.data.start_ts, trip_obj.data.end_ts)
ts = esta.TimeSeries.get_time_series(user_id)
potential_candidates = estsc.find_entries(user_id, [user_input_key], tq)
if len(potential_candidates) == 0:
return None
sorted_pc = sorted(potential_candidates, key=lambda c:c["metadata"]["write_ts"])
most_recent_entry = potential_candidates[-1]
logging.debug("most recent entry has id %s" % most_recent_entry["_id"])
logging.debug("and is mapped to entry %s" % most_recent_entry)
return ecwe.Entry(most_recent_entry)
return final_candidate(valid_user_input(ts, trip_obj), potential_candidates)

def valid_trip(ts, user_input):
def curried(trip_obj):
return valid_user_input_for_trip(ts, trip_obj, user_input)
return curried

def get_trip_for_user_input_obj(ts, ui_obj):
# the match check that we have is:
# user input can start after trip start
# user input can end before trip end OR user input is within 5 mins of trip end
# Given those considerations, there is no principled query for trip data
# that fits into our query model
# the trip start is before the user input start, but that can go until eternity
# and the trip end can be either before or after the user input end
# we know that the trip end is after the user input start, but again, that
# can go on until now.
# As a workaround, let us assume that the trip start is no more than a day
# before the start of the ui object, which seems like a fairly conservative
# assumption
ONE_DAY = 24 * 60 * 60
tq = estt.TimeQuery("data.start_ts", ui_obj.data.start_ts - ONE_DAY,
ui_obj.data.start_ts + ONE_DAY)
potential_candidates = ts.find_entries(["analysis/confirmed_trip"], tq)
return final_candidate(valid_trip(ts, ui_obj), potential_candidates)
Loading

0 comments on commit b309285

Please sign in to comment.