Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 18 #54

Merged
merged 25 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3f5be09
add config.py and read_data_from_file.py
Mar 9, 2021
ed63666
finalize dump_data_to_file
Mar 10, 2021
4b0a640
update cli args
Mar 12, 2021
254f410
add capability to export raw data to file when additional parameters …
Mar 12, 2021
9cb24ee
successfully replicate behavior of SpecDetails, handle duplicate spec…
Mar 15, 2021
ec28776
significant refactors + add fill transitions functionality
Mar 16, 2021
d78e497
create v2 based on new ServerSpecDetails class and refactored PhoneVi…
Mar 17, 2021
06b8185
finish script refactor, create general structure for dumping based on…
Mar 18, 2021
6b909cf
move read_until_done to spec_details
Mar 19, 2021
ffa5d27
add FileSpecDetails, configure proper output from PhoneView map
Mar 19, 2021
c0eb143
Add simple unit test with a mocking example
shankari Mar 22, 2021
e8c9fe0
add in code review changes aside from read_until_done
Mar 22, 2021
5746c99
Merge pull request #10 from MobilityNet/add_simple_unit_test
Mar 22, 2021
80c8638
add documentation to parser for dump_data_to_file
Mar 22, 2021
9a7ff44
fix PhoneView to use time series keys from emission
Mar 23, 2021
1821b87
mostly working FileSpecDetails, constants added to PhoneView
Mar 24, 2021
bee4c2a
fully working FileSpecDetails
Mar 25, 2021
1252e2d
updated notebooks + incorporated changes on PR
Mar 26, 2021
24832dc
Restore full tree display
shankari Mar 26, 2021
e942ab8
remove outputs from Evaluations_power_boxplots
Mar 29, 2021
0cf5836
Merge branch 'issue18' of https://github.com/singhish/mobilitynet-ana…
Mar 29, 2021
46c67c8
revert _master notebooks
Mar 29, 2021
e2d72f8
remove .DS_Store
Mar 29, 2021
550ea7a
remove .DS_Store from subfolders
Mar 29, 2021
fb7007d
Add quotes around the `DATASTORE_LOC` as well
shankari Mar 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions bin/dump_data_to_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import json
import os
import requests
import time
import arrow
import argparse
import sys; sys.path.append("..")
from emeval.input.spec_details import ServerSpecDetails
from emeval.input.phone_view import PhoneView


DEFAULT_SPEC_USER = "[email protected]"
DATASTORE_URL = None

def dump_to_file(data, spec_id, user, key, start_ts, end_ts):
"""
Dumped outputs are created recursively in folder relative to path of script.
"""
key = key.replace("/", "~") # key could have a slash, replace with tilde

out_path = os.path.join("data", spec_id, user, key)
shankari marked this conversation as resolved.
Show resolved Hide resolved
os.makedirs(out_path, exist_ok=True)

out_file = os.path.join(out_path, f"{start_ts}_{end_ts}.json")

print(f"Creating {out_file=}...")

with open(out_file, "w") as f:
json.dump(data, f, indent=4)


def retrieve_data_from_server(user, key, start_ts, end_ts):
shankari marked this conversation as resolved.
Show resolved Hide resolved
"""
Standalone function similar to ServerSpecDetails's implementation of retrieve_data.
Used when --key, --user, --start-ts, and --end-ts are specified & for retrieving all specs
"""
post_body = {
"user": user,
"key_list": [key],
"start_time": start_ts,
"end_time": end_ts
}

print(f"Retrieving data for: {post_body=}")
try:
response = requests.post(f"{DATASTORE_URL}/datastreams/find_entries/timestamp", json=post_body)
print(f"{response=}")
response.raise_for_status()
data = response.json()["phone_data"]
except Exception as e:
print(f"Got {type(e).__name__}: {e}, retrying...")
time.sleep(10)
response = requests.post(f"{DATASTORE_URL}/datastreams/find_entries/timestamp", json=post_body)
print(f"{response=}")
response.raise_for_status()
data = response.json()["phone_data"]

for e in data:
e["data"]["write_ts"] = e["metadata"]["write_ts"]

print(f"Found {len(data)} entries")
return data


def get_all_spec_ids():
spec_data = retrieve_data_from_server(DEFAULT_SPEC_USER, "config/evaluation_spec", 0, arrow.get().timestamp)
spec_ids = [s["data"]["label"]["id"] for s in spec_data]
return set(spec_ids)


def parse_args():
parser = argparse.ArgumentParser()

parser.add_argument("--datastore-url", type=str, default="http://localhost:8080")
parser.add_argument("--spec-id", type=str)

# if one of these arguments is specified, the others in this group must also be specified
if any(arg in sys.argv for arg in ["--key", "--user", "--start-ts", "--end-ts"]):
parser.add_argument("--key", type=str, required=True)
parser.add_argument("--user", type=str, required=True)
parser.add_argument("--start-ts", type=float, required=True)
parser.add_argument("--end-ts", type=float, required=True)
shankari marked this conversation as resolved.
Show resolved Hide resolved

return parser.parse_args()


if __name__ == "__main__":
args = parse_args()

# set instance variables
DATASTORE_URL = args.datastore_url

# verify spec_id is valid if specified
spec_ids = get_all_spec_ids()
if args.spec_id:
assert args.spec_id in spec_ids, f"spec_id `{args.spec_id}` not found within current datastore instance"
spec_ids = [args.spec_id]

# if --key, etc are specified, just call the function above
if "--key" in sys.argv:
for s_id in spec_ids:
data = retrieve_data_from_server(args.user, args.key, args.start_ts, args.end_ts)
dump_to_file(data, s_id, args.user, args.key, args.start_ts, args.end_ts)
else:
# create spec_details objects depending on flag specified
spec_detailss = [ServerSpecDetails(DATASTORE_URL, DEFAULT_SPEC_USER, s_id) for s_id in spec_ids]
shankari marked this conversation as resolved.
Show resolved Hide resolved

# build and dump phone view maps
for sd in spec_detailss:
pv = PhoneView(sd)
for phone_os, phone_map in pv.map().items():
for phone_label, phone_detail_map in phone_map.items():
for r in phone_detail_map["evaluation_ranges"]:
for key in [k for k in r.keys() if "_entries" in k]:
dump_to_file(r[key], sd.CURR_SPEC_ID, phone_label, key, r["start_ts"], r["end_ts"])
shankari marked this conversation as resolved.
Show resolved Hide resolved
32 changes: 6 additions & 26 deletions emeval/input/phone_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def fill_transitions(self):
print("Reading data for %s phones" % phoneOS)
for phone_label in phone_map:
print("Loading transitions for phone %s" % phone_label)
curr_phone_transitions = self.spec_details.retrieve_data_from_server(
curr_phone_transitions = self.spec_details.retrieve_data(
phone_label, ["manual/evaluation_transition"],
self.spec_details.eval_start_ts, self.spec_details.eval_end_ts)
curr_phone_role = phone_map[phone_label]
Expand Down Expand Up @@ -279,7 +279,7 @@ def fill_battery_df(self, storage_key):
for phone_label in phone_map:
curr_calibration_ranges = phone_map[phone_label]["{}_ranges".format(storage_key)]
for r in curr_calibration_ranges:
battery_entries = self.spec_details.retrieve_data_from_server(phone_label, ["background/battery"], r["start_ts"], r["end_ts"])
battery_entries = self.spec_details.retrieve_data(phone_label, ["background/battery"], r["start_ts"], r["end_ts"])
# ios entries before running the pipeline are marked with battery_level_ratio, which is a float from 0 ->1
# convert it to % to be consistent with android and easier to understand
if phoneOS == "ios":
Expand All @@ -293,36 +293,16 @@ def fill_battery_df(self, storage_key):
battery_df["hr"] = (battery_df.ts-r["start_ts"])/3600.0
r["battery_df"] = battery_df

def _read_until_done(self, phone_label, key, start_ts, end_ts):
all_done = False
location_entries = []
curr_start_ts = start_ts
prev_retrieved_count = 0

while not all_done:
print("About to retrieve data for %s from %s -> %s" % (phone_label, curr_start_ts, end_ts))
curr_location_entries = self.spec_details.retrieve_data_from_server(phone_label, [key], curr_start_ts, end_ts)
print("Retrieved %d entries with timestamps %s..." % (len(curr_location_entries), [cle["data"]["ts"] for cle in curr_location_entries[0:10]]))
if len(curr_location_entries) == 0 or len(curr_location_entries) == 1:
all_done = True
else:
location_entries.extend(curr_location_entries)
new_start_ts = curr_location_entries[-1]["metadata"]["write_ts"]
assert new_start_ts > curr_start_ts
curr_start_ts = new_start_ts
prev_retrieved_count = len(curr_location_entries)
return location_entries

def fill_location_df(self, storage_key):
for phoneOS, phone_map in self.phone_view_map.items():
print("Processing data for %s phones" % phoneOS)
for phone_label in phone_map:
curr_calibration_ranges = phone_map[phone_label]["{}_ranges".format(storage_key)]
for r in curr_calibration_ranges:
r["location_entries"] = self._read_until_done(phone_label,
r["location_entries"] = self.spec_details.read_until_done(phone_label,
shankari marked this conversation as resolved.
Show resolved Hide resolved
"background/location",
r["start_ts"], r["end_ts"])
r["filtered_location_entries"] = self._read_until_done(
r["filtered_location_entries"] = self.spec_details.read_until_done(
phone_label,
"background/filtered_location",
r["start_ts"], r["end_ts"])
Expand Down Expand Up @@ -350,7 +330,7 @@ def fill_motion_activity_df(self, storage_key):

while not all_done:
print("About to retrieve data for %s from %s -> %s" % (phone_label, curr_start_ts, r["end_ts"]))
curr_motion_activity_entries = self.spec_details.retrieve_data_from_server(phone_label, ["background/motion_activity"], curr_start_ts, r["end_ts"])
curr_motion_activity_entries = self.spec_details.retrieve_data(phone_label, ["background/motion_activity"], curr_start_ts, r["end_ts"])
print("Retrieved %d entries with timestamps %s..." % (len(curr_motion_activity_entries), [cle["metadata"]["write_ts"] for cle in curr_motion_activity_entries[0:10]]))
if len(curr_motion_activity_entries) == 0 or len(curr_motion_activity_entries) == 1 or len(curr_motion_activity_entries) == prev_retrieved_count:
all_done = True
Expand All @@ -373,7 +353,7 @@ def fill_transition_df(self, storage_key):
for phone_label in phone_map:
curr_calibration_ranges = phone_map[phone_label]["{}_ranges".format(storage_key)]
for r in curr_calibration_ranges:
transition_entries = self.spec_details.retrieve_data_from_server(
transition_entries = self.spec_details.retrieve_data(
phone_label, ["statemachine/transition"], r["start_ts"], r["end_ts"])
# ios entries before running the pipeline are marked with battery_level_ratio, which is a float from 0 ->1
# convert it to % to be consistent with android and easier to understand
Expand Down
109 changes: 71 additions & 38 deletions emeval/input/spec_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,54 +6,29 @@
import requests
import shapely as shp
import geojson as gj
from abc import ABC, abstractmethod
import json
import os
shankari marked this conversation as resolved.
Show resolved Hide resolved

class SpecDetails:

class SpecDetails(ABC):
def __init__(self, datastore_url, author_email, spec_id):
self.DATASTORE_URL = datastore_url
self.AUTHOR_EMAIL = author_email
self.CURR_SPEC_ID = spec_id
self.curr_spec_entry = self.get_current_spec()
self.populate_spec_details(self.curr_spec_entry)

def retrieve_data_from_server(self, user_label, key_list, start_ts, end_ts):
post_msg = {
"user": user_label,
"key_list": key_list,
"start_time": start_ts,
"end_time": end_ts
}
print("About to retrieve messages using %s" % post_msg)
try:
response = requests.post(self.DATASTORE_URL+"/datastreams/find_entries/timestamp", json=post_msg)
print("response = %s" % response)
response.raise_for_status()
ret_list = response.json()["phone_data"]
except Exception as e:
print("Got %s error %s, retrying" % (type(e).__name__, e))
time.sleep(10)
response = requests.post(self.DATASTORE_URL+"/datastreams/find_entries/timestamp", json=post_msg)
print("response = %s" % response)
response.raise_for_status()
ret_list = response.json()["phone_data"]
# write_ts may not be the same as data.ts, specially in the case of
# transitions, where we first generate the data.ts in javascript and
# then pass it down to the native code to store
# normally, this doesn't matter because it is a microsecond difference, but
# it does matter in this case because we store several entries in quick
# succession and we want to find the entries within a particular range.
# Putting it into the "data" object makes the write_ts accessible in the
# subsequent dataframes, etc
for e in ret_list:
e["data"]["write_ts"] = e["metadata"]["write_ts"]
print("Found %d entries" % len(ret_list))
return ret_list
@abstractmethod
def retrieve_data(self, user, key_list, start_ts, end_ts):
pass

def retrieve_all_data_from_server(self, user_label, key_list):
return self.retrieve_data_from_server(user_label, key_list, 0,
def retrieve_all_data(self, user, key_list):
return self.retrieve_data(user, key_list, 0,
arrow.get().timestamp)

def get_current_spec(self):
all_spec_entry_list = self.retrieve_all_data_from_server(self.AUTHOR_EMAIL, ["config/evaluation_spec"])
all_spec_entry_list = self.retrieve_all_data(self.AUTHOR_EMAIL, ["config/evaluation_spec"])
curr_spec_entry = None
for s in all_spec_entry_list:
if s["data"]["label"]["id"] == self.CURR_SPEC_ID:
Expand Down Expand Up @@ -95,6 +70,25 @@ def get_ground_truth_for_trip(self, trip_id, start_ts, end_ts):

return tl

def read_until_done(self, phone_label, key, start_ts, end_ts):
all_done = False
location_entries = []
curr_start_ts = start_ts
prev_retrieved_count = 0

while not all_done:
print("About to retrieve data for %s from %s -> %s" % (phone_label, curr_start_ts, end_ts))
curr_location_entries = self.retrieve_data(phone_label, [key], curr_start_ts, end_ts)
print("Retrieved %d entries with timestamps %s..." % (len(curr_location_entries), [cle["data"]["ts"] for cle in curr_location_entries[0:10]]))
if len(curr_location_entries) == 0 or len(curr_location_entries) == 1:
all_done = True
else:
location_entries.extend(curr_location_entries)
new_start_ts = curr_location_entries[-1]["metadata"]["write_ts"]
assert new_start_ts > curr_start_ts
curr_start_ts = new_start_ts
prev_retrieved_count = len(curr_location_entries)
return location_entries

@staticmethod
def get_concat_trajectories(trip):
Expand Down Expand Up @@ -126,7 +120,6 @@ def get_shapes_for_leg(gt_leg):
else:
return {"loc": shp.geometry.shape(gt_leg["loc"]["geometry"])}


@classmethod
def get_geojson_for_leg(cls, gt_leg):
if gt_leg["type"] == "TRAVEL":
Expand All @@ -138,4 +131,44 @@ def get_geojson_for_leg(cls, gt_leg):
else:
gt_leg["loc"]["properties"]["style"] = {"color": "purple", "fillColor": "purple"}
return gt_leg["loc"]



class ServerSpecDetails(SpecDetails):
def retrieve_data(self, user, key_list, start_ts, end_ts):
post_body = {
"user": user,
"key_list": key_list,
"start_time": start_ts,
"end_time": end_ts
}

print(f"Retrieving data for: {post_body=}")
try:
response = requests.post(f"{self.DATASTORE_URL}/datastreams/find_entries/timestamp", json=post_body)
print(f"{response=}")
response.raise_for_status()
data = response.json()["phone_data"]
except Exception as e:
print(f"Got {type(e).__name__}: {e}, retrying...")
time.sleep(10)
response = requests.post(f"{self.DATASTORE_URL}/datastreams/find_entries/timestamp", json=post_body)
print(f"{response=}")
response.raise_for_status()
data = response.json()["phone_data"]

for e in data:
e["data"]["write_ts"] = e["metadata"]["write_ts"]

print(f"Found {len(data)} entries")
return data


class FileSpecDetails(SpecDetails):
data = []
def retrieve_data(self, user, key_list, start_ts, end_ts):
for key in key_list:
data_file = f"data/{self.CURR_SPEC_ID}/{user}/{key.replace('/', '~')}/{start_ts}_{end_ts}.json"
assert os.path.isfile(data_file)
with open(data_file, "r") as f:
data.extend(json.load(f))
return data