Skip to content

Commit

Permalink
Reverts
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Oct 16, 2024
1 parent 5ba94d7 commit 7f12f67
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 87 deletions.
49 changes: 12 additions & 37 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def __init__(self, user_id):
"stats/server_api_time": self.timeseries_db,
"stats/server_api_error": self.timeseries_db,
"stats/pipeline_time": self.timeseries_db,
"stats/pipeline_error": self.timeseries_db,
"stats/dashboard_time": self.timeseries_db,
"stats/dashboard_error": self.timeseries_db,
"stats/pipeline_error": self.timeseries_db,
"stats/client_time": self.timeseries_db,
"stats/client_nav_event": self.timeseries_db,
"stats/client_error": self.timeseries_db,
Expand Down Expand Up @@ -130,43 +130,18 @@ def _get_query(self, key_list = None, time_query = None, geo_query = None,
:return:
"""
ret_query = {"invalid": {"$exists": False}}

# Update with user-specific queries
if hasattr(self, 'user_query') and isinstance(self.user_query, dict):
ret_query.update(self.user_query)
elif hasattr(self, 'user_query'):
logging.warning("user_query is not a dict and will not be used.")

# Handle key_list
if key_list:
key_query_list = [self.key_query(key) for key in key_list]
ret_query["$or"] = key_query_list

# Handle time_query
if time_query:
if hasattr(time_query, 'get_query') and callable(time_query.get_query):
time_query_dict = time_query.get_query()
ret_query.update(time_query_dict)
elif isinstance(time_query, dict):
ret_query.update(time_query)
else:
raise TypeError("time_query must have a get_query method or be a dict")

# Handle geo_query
if geo_query:
if hasattr(geo_query, 'get_query') and callable(geo_query.get_query):
geo_query_dict = geo_query.get_query()
ret_query.update(geo_query_dict)
elif isinstance(geo_query, dict):
ret_query.update(geo_query)
else:
raise TypeError("geo_query must have a get_query method or be a dict")

# Handle extra_query_list
if extra_query_list:
ret_query.update(self.user_query)
if key_list is not None and len(key_list) > 0:
key_query_list = []
for key in key_list:
key_query_list.append(self.key_query(key))
ret_query.update({"$or": key_query_list})
if time_query is not None:
ret_query.update(time_query.get_query())
if geo_query is not None:
ret_query.update(geo_query.get_query())
if extra_query_list is not None:
for extra_query in extra_query_list:
if not isinstance(extra_query, dict):
raise TypeError("Each extra_query must be a dict")
eq_keys = set(extra_query.keys())
curr_keys = set(ret_query.keys())
overlap_keys = eq_keys.intersection(curr_keys)
Expand Down
26 changes: 6 additions & 20 deletions emission/storage/timeseries/non_user_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@

import emission.core.get_database as edb
import emission.storage.timeseries.builtin_timeseries as bits
import emission.core.wrapper.entry as ecwe # Added missing import
import emission.storage.timeseries.aggregate_timeseries as esta
import emission.core.wrapper.entry as ecwe

class NonUserTimeSeries(bits.BuiltinTimeSeries):
def __init__(self):
super(esta.AggregateTimeSeries, self).__init__(None)
super(ecwe.AggregateTimeSeries, self).__init__(None)
self.user_query = {}
self.timeseries_db = edb.get_non_user_timeseries_db()

Expand Down Expand Up @@ -56,25 +55,13 @@ def insert(self, entry):
if type(entry) == dict:
entry = ecwe.Entry(entry)
if entry["user_id"] is not None:
raise AttributeError(
f"Saving entry {entry} for {entry['user_id']} in non_user_timeseries is not allowed."
)
raise AttributeError("Saving entry %s for %s in non_user_timeseries" %
(entry, entry["user_id"]))
else:
logging.debug("entry was fine, no need to fix it")

# Get the collection and log its full name
collection = self.get_timeseries_db(entry.metadata.key)
logging.debug(f"Collection used for insertion: {collection.full_name}")

logging.debug(f"Inserting entry {entry} into timeseries")
try:
result = collection.insert_one(entry)
logging.debug(f"Inserted entry with ID: {result.inserted_id}")
return result.inserted_id
except pymongo.errors.PyMongoError as e:
logging.error(f"Failed to insert entry: {e}")
raise

logging.debug("Inserting entry %s into timeseries" % entry)
return self.get_timeseries_db(entry.metadata.key).insert(entry)

# insert_data is unchanged
def insert_error(self, entry):
Expand All @@ -92,7 +79,6 @@ def update(entry):
"""
raise AttributeError("non_user_timeseries does not support updates")


@staticmethod
def update_data(user_id, key, obj_id, data):
"""
Expand Down
58 changes: 28 additions & 30 deletions emission/tests/funcTests/TestFunctionTiming.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,26 @@ def execute_and_time_function(self, func: t.Callable[[], bool]):
stored_documents_chain = self.timeseries_db.find_entries(["stats/dashboard_time"], time_query=None)

# Convert the chain to a list to make it subscriptable and to allow multiple accesses
stored_documents = list(stored_documents_chain)
stored_document = list(stored_documents_chain)

# Assert that at least one document was retrieved
self.assertTrue(
len(stored_documents) > 0,
len(stored_document) > 0,
f"Data for '{function_name}' was not found in the database."
)

stored_document = stored_document[0]
# Iterate over each document and inspect its contents
for idx, stored_document in enumerate(stored_documents):
try:
stored_ts = stored_document['data']['ts']
stored_reading = stored_document['data']['reading']
stored_name = stored_document['data']['name']
logging.debug(
f"Stored Document {idx} for '{function_name}': ts={stored_ts}, reading={stored_reading}, name={stored_name}"
)
except KeyError as e:
self.fail(
f"Missing key {e} in stored document {idx} for '{function_name}'."
)
try:
stored_ts = stored_document['data']['ts']
stored_reading = stored_document['data']['reading']
stored_name = stored_document['data']['name']
logging.debug(
f"Stored Document for '{function_name}': ts={stored_ts}, reading={stored_reading}, name={stored_name}"
)
except KeyError as e:
self.fail(
f"Missing key {e} in stored document for '{function_name}'."
)

# Assert that the stored_reading_error matches elapsed_ms exactly
self.assertEqual(
Expand Down Expand Up @@ -155,27 +154,26 @@ def execute_and_time_function(self, func: t.Callable[[], bool]):
stored_error_chain = self.timeseries_db.find_entries(["stats/dashboard_error"], time_query=None)

# Convert the chain to a list to make it subscriptable and to allow multiple accesses
stored_errors = list(stored_error_chain)
stored_error = list(stored_error_chain)

# Assert that at least one document was retrieved
self.assertTrue(
len(stored_errors) > 0,
len(stored_error) > 0,
f"Data for '{function_name}' was not found in the database."
)

stored_error = stored_error[0]
# Iterate over each document and inspect its contents
for idx, stored_error in enumerate(stored_errors):
try:
stored_ts_error = stored_error['data']['ts']
stored_reading_error = stored_error['data']['reading']
stored_name_error = stored_error['data']['name']
logging.debug(
f"Stored Document {idx} for '{function_name}': ts={stored_ts_error}, reading={stored_reading_error}, name={stored_name_error}"
)
except KeyError as e:
self.fail(
f"Missing key {e} in stored document {idx} for '{function_name}'."
)
try:
stored_ts_error = stored_error['data']['ts']
stored_reading_error = stored_error['data']['reading']
stored_name_error = stored_error['data']['name']
logging.debug(
f"Stored Document for '{function_name}': ts={stored_ts_error}, reading={stored_reading_error}, name={stored_name_error}"
)
except KeyError as e:
self.fail(
f"Missing key {e} in stored document for '{function_name}'."
)
# Assert that the stored_reading_error matches elapsed_ms exactly
self.assertEqual(
stored_reading_error,
Expand Down

0 comments on commit 7f12f67

Please sign in to comment.