From 7f12f67aa17c06022076b86865213599b8fa0e4e Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Tue, 15 Oct 2024 18:00:29 -0700 Subject: [PATCH] Reverts --- .../storage/timeseries/builtin_timeseries.py | 49 ++++------------ .../storage/timeseries/non_user_timeseries.py | 26 ++------- .../tests/funcTests/TestFunctionTiming.py | 58 +++++++++---------- 3 files changed, 46 insertions(+), 87 deletions(-) diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index f7e1a1240..78d785719 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -54,9 +54,9 @@ def __init__(self, user_id): "stats/server_api_time": self.timeseries_db, "stats/server_api_error": self.timeseries_db, "stats/pipeline_time": self.timeseries_db, - "stats/pipeline_error": self.timeseries_db, "stats/dashboard_time": self.timeseries_db, "stats/dashboard_error": self.timeseries_db, + "stats/pipeline_error": self.timeseries_db, "stats/client_time": self.timeseries_db, "stats/client_nav_event": self.timeseries_db, "stats/client_error": self.timeseries_db, @@ -130,43 +130,18 @@ def _get_query(self, key_list = None, time_query = None, geo_query = None, :return: """ ret_query = {"invalid": {"$exists": False}} - - # Update with user-specific queries - if hasattr(self, 'user_query') and isinstance(self.user_query, dict): - ret_query.update(self.user_query) - elif hasattr(self, 'user_query'): - logging.warning("user_query is not a dict and will not be used.") - - # Handle key_list - if key_list: - key_query_list = [self.key_query(key) for key in key_list] - ret_query["$or"] = key_query_list - - # Handle time_query - if time_query: - if hasattr(time_query, 'get_query') and callable(time_query.get_query): - time_query_dict = time_query.get_query() - ret_query.update(time_query_dict) - elif isinstance(time_query, dict): - ret_query.update(time_query) - else: - raise TypeError("time_query must have a get_query method or be a dict") - - # Handle geo_query - if geo_query: - if hasattr(geo_query, 'get_query') and callable(geo_query.get_query): - geo_query_dict = geo_query.get_query() - ret_query.update(geo_query_dict) - elif isinstance(geo_query, dict): - ret_query.update(geo_query) - else: - raise TypeError("geo_query must have a get_query method or be a dict") - - # Handle extra_query_list - if extra_query_list: + ret_query.update(self.user_query) + if key_list is not None and len(key_list) > 0: + key_query_list = [] + for key in key_list: + key_query_list.append(self.key_query(key)) + ret_query.update({"$or": key_query_list}) + if time_query is not None: + ret_query.update(time_query.get_query()) + if geo_query is not None: + ret_query.update(geo_query.get_query()) + if extra_query_list is not None: for extra_query in extra_query_list: - if not isinstance(extra_query, dict): - raise TypeError("Each extra_query must be a dict") eq_keys = set(extra_query.keys()) curr_keys = set(ret_query.keys()) overlap_keys = eq_keys.intersection(curr_keys) diff --git a/emission/storage/timeseries/non_user_timeseries.py b/emission/storage/timeseries/non_user_timeseries.py index f628b6153..9cc8055cc 100644 --- a/emission/storage/timeseries/non_user_timeseries.py +++ b/emission/storage/timeseries/non_user_timeseries.py @@ -5,12 +5,11 @@ import emission.core.get_database as edb import emission.storage.timeseries.builtin_timeseries as bits -import emission.core.wrapper.entry as ecwe # Added missing import -import emission.storage.timeseries.aggregate_timeseries as esta +import emission.core.wrapper.entry as ecwe class NonUserTimeSeries(bits.BuiltinTimeSeries): def __init__(self): - super(esta.AggregateTimeSeries, self).__init__(None) + super(ecwe.AggregateTimeSeries, self).__init__(None) self.user_query = {} self.timeseries_db = edb.get_non_user_timeseries_db() @@ -56,25 +55,13 @@ def insert(self, entry): if type(entry) == dict: entry = ecwe.Entry(entry) if entry["user_id"] is not None: - raise AttributeError( - f"Saving entry {entry} for {entry['user_id']} in non_user_timeseries is not allowed." - ) + raise AttributeError("Saving entry %s for %s in non_user_timeseries" % + (entry, entry["user_id"])) else: logging.debug("entry was fine, no need to fix it") - # Get the collection and log its full name - collection = self.get_timeseries_db(entry.metadata.key) - logging.debug(f"Collection used for insertion: {collection.full_name}") - - logging.debug(f"Inserting entry {entry} into timeseries") - try: - result = collection.insert_one(entry) - logging.debug(f"Inserted entry with ID: {result.inserted_id}") - return result.inserted_id - except pymongo.errors.PyMongoError as e: - logging.error(f"Failed to insert entry: {e}") - raise - + logging.debug("Inserting entry %s into timeseries" % entry) + return self.get_timeseries_db(entry.metadata.key).insert(entry) # insert_data is unchanged def insert_error(self, entry): @@ -92,7 +79,6 @@ def update(entry): """ raise AttributeError("non_user_timeseries does not support updates") - @staticmethod def update_data(user_id, key, obj_id, data): """ diff --git a/emission/tests/funcTests/TestFunctionTiming.py b/emission/tests/funcTests/TestFunctionTiming.py index 71df76e3e..b4e725770 100644 --- a/emission/tests/funcTests/TestFunctionTiming.py +++ b/emission/tests/funcTests/TestFunctionTiming.py @@ -97,27 +97,26 @@ def execute_and_time_function(self, func: t.Callable[[], bool]): stored_documents_chain = self.timeseries_db.find_entries(["stats/dashboard_time"], time_query=None) # Convert the chain to a list to make it subscriptable and to allow multiple accesses - stored_documents = list(stored_documents_chain) + stored_document = list(stored_documents_chain) # Assert that at least one document was retrieved self.assertTrue( - len(stored_documents) > 0, + len(stored_document) > 0, f"Data for '{function_name}' was not found in the database." ) - + stored_document = stored_document[0] # Iterate over each document and inspect its contents - for idx, stored_document in enumerate(stored_documents): - try: - stored_ts = stored_document['data']['ts'] - stored_reading = stored_document['data']['reading'] - stored_name = stored_document['data']['name'] - logging.debug( - f"Stored Document {idx} for '{function_name}': ts={stored_ts}, reading={stored_reading}, name={stored_name}" - ) - except KeyError as e: - self.fail( - f"Missing key {e} in stored document {idx} for '{function_name}'." - ) + try: + stored_ts = stored_document['data']['ts'] + stored_reading = stored_document['data']['reading'] + stored_name = stored_document['data']['name'] + logging.debug( + f"Stored Document for '{function_name}': ts={stored_ts}, reading={stored_reading}, name={stored_name}" + ) + except KeyError as e: + self.fail( + f"Missing key {e} in stored document for '{function_name}'." + ) # Assert that the stored_reading_error matches elapsed_ms exactly self.assertEqual( @@ -155,27 +154,26 @@ def execute_and_time_function(self, func: t.Callable[[], bool]): stored_error_chain = self.timeseries_db.find_entries(["stats/dashboard_error"], time_query=None) # Convert the chain to a list to make it subscriptable and to allow multiple accesses - stored_errors = list(stored_error_chain) + stored_error = list(stored_error_chain) # Assert that at least one document was retrieved self.assertTrue( - len(stored_errors) > 0, + len(stored_error) > 0, f"Data for '{function_name}' was not found in the database." ) - + stored_error = stored_error[0] # Iterate over each document and inspect its contents - for idx, stored_error in enumerate(stored_errors): - try: - stored_ts_error = stored_error['data']['ts'] - stored_reading_error = stored_error['data']['reading'] - stored_name_error = stored_error['data']['name'] - logging.debug( - f"Stored Document {idx} for '{function_name}': ts={stored_ts_error}, reading={stored_reading_error}, name={stored_name_error}" - ) - except KeyError as e: - self.fail( - f"Missing key {e} in stored document {idx} for '{function_name}'." - ) + try: + stored_ts_error = stored_error['data']['ts'] + stored_reading_error = stored_error['data']['reading'] + stored_name_error = stored_error['data']['name'] + logging.debug( + f"Stored Document for '{function_name}': ts={stored_ts_error}, reading={stored_reading_error}, name={stored_name_error}" + ) + except KeyError as e: + self.fail( + f"Missing key {e} in stored document for '{function_name}'." + ) # Assert that the stored_reading_error matches elapsed_ms exactly self.assertEqual( stored_reading_error,