From fc3f0eea6d2765f700a5024d2964e5c6f596cbfd Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Mon, 14 Oct 2024 20:46:39 -0700 Subject: [PATCH] Used TimeSeries Interface and made it so _get_query accepts dict and objects --- .../storage/timeseries/builtin_timeseries.py | 61 +++++++++++++------ .../tests/funcTests/TestFunctionTiming.py | 50 ++++++++------- 2 files changed, 72 insertions(+), 39 deletions(-) diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index 39b87776f..037951e9b 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -130,28 +130,53 @@ def _get_query(self, key_list = None, time_query = None, geo_query = None, :return: """ ret_query = {"invalid": {"$exists": False}} - ret_query.update(self.user_query) - if key_list is not None and len(key_list) > 0: - key_query_list = [] - for key in key_list: - key_query_list.append(self.key_query(key)) - ret_query.update({"$or": key_query_list}) - if time_query is not None: - ret_query.update(time_query.get_query()) - if geo_query is not None: - ret_query.update(geo_query.get_query()) - if extra_query_list is not None: + + # Update with user-specific queries + if hasattr(self, 'user_query') and isinstance(self.user_query, dict): + ret_query.update(self.user_query) + elif hasattr(self, 'user_query'): + logging.warning("user_query is not a dict and will not be used.") + + # Handle key_list + if key_list: + key_query_list = [self.key_query(key) for key in key_list] + ret_query["$or"] = key_query_list + + # Handle time_query + if time_query: + if hasattr(time_query, 'get_query') and callable(time_query.get_query): + time_query_dict = time_query.get_query() + ret_query.update(time_query_dict) + elif isinstance(time_query, dict): + ret_query.update(time_query) + else: + raise TypeError("time_query must have a get_query method or be a dict") + + # Handle geo_query + if geo_query: + if hasattr(geo_query, 'get_query') and callable(geo_query.get_query): + geo_query_dict = geo_query.get_query() + ret_query.update(geo_query_dict) + elif isinstance(geo_query, dict): + ret_query.update(geo_query) + else: + raise TypeError("geo_query must have a get_query method or be a dict") + + # Handle extra_query_list + if extra_query_list: for extra_query in extra_query_list: + if not isinstance(extra_query, dict): + raise TypeError("Each extra_query must be a dict") eq_keys = set(extra_query.keys()) curr_keys = set(ret_query.keys()) overlap_keys = eq_keys.intersection(curr_keys) - if len(overlap_keys) != 0: - logging.info("eq_keys = %s, curr_keys = %s, overlap_keys = %s" % - (eq_keys, curr_keys, overlap_keys)) - raise AttributeError("extra query would overwrite keys %s" % - list(overlap_keys)) - else: - ret_query.update(extra_query) + if overlap_keys: + logging.info( + f"eq_keys = {eq_keys}, curr_keys = {curr_keys}, overlap_keys = {overlap_keys}" + ) + raise AttributeError(f"extra query would overwrite keys {list(overlap_keys)}") + ret_query.update(extra_query) + return ret_query def _get_sort_key(self, time_query = None): diff --git a/emission/tests/funcTests/TestFunctionTiming.py b/emission/tests/funcTests/TestFunctionTiming.py index 86344fdc9..e5c16eb88 100644 --- a/emission/tests/funcTests/TestFunctionTiming.py +++ b/emission/tests/funcTests/TestFunctionTiming.py @@ -3,6 +3,7 @@ import logging import time import typing as t +import pymongo # Import the store_dashboard_time and store_dashboard_error functions import emission.storage.decorations.stats_queries as sdq @@ -11,8 +12,8 @@ import emission.core.timer as ec_timer # Import the database module for verification -import emission.storage.timeseries.builtin_timeseries as bits -builtin_ts = bits.BuiltinTimeSeries(None) +import emission.storage.timeseries.abstract_timeseries as esta + # Define test functions def test_function_1(): logging.info("Executing test_function_1") @@ -67,20 +68,21 @@ def execute_and_time_function(func: t.Callable[[], bool]): logging.info(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.") # Verification: Adjusted Query to Match Document Structure - timeseries_db = builtin_ts.get_timeseries_db(key="stats/dashboard_time") - + timeseries_db = esta.TimeSeries.get_time_series(None) - query = { - "metadata.key": "stats/dashboard_time", - "data.name": function_name, + # Define the time range and additional filters + time_query = { "data.ts": {"$gte": timestamp, "$lte": timestamp}, - "data.reading": {"$gte": elapsed_ms, "$lte": elapsed_ms} + "data.name": function_name, + "data.reading": {"$gte": elapsed_ms, "$lte": elapsed_ms} } - - # Retrieve the most recent document for the function - stored_document = timeseries_db.find_one( - query, - sort=[("data.ts", -1)] + + # Retrieve the first matching entry using the get_first_entry method + stored_document = timeseries_db.get_first_entry( + key="stats/dashboard_time", + field="data.ts", + sort_order=pymongo.DESCENDING, + time_query=time_query ) if stored_document: @@ -115,19 +117,25 @@ def execute_and_time_function(func: t.Callable[[], bool]): logging.error(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}") # Verification: Adjusted Error Query to Match Document Structure - timeseries_db = builtin_ts.get_timeseries_db(key="stats/dashboard_error") + # Initialize the TimeSeries database connection + timeseries_db = esta.TimeSeries.get_time_series(None) - error_query = { - "metadata.key": "stats/dashboard_error", - "data.name": function_name, + # Define the time range and additional filters + time_query = { "data.ts": {"$gte": timestamp, "$lte": timestamp}, + "data.name": function_name, "data.reading": {"$gte": elapsed_ms, "$lte": elapsed_ms} } - stored_error = timeseries_db.find_one( - error_query, - sort=[("data.ts", -1)] + + # Retrieve the first matching entry using the get_first_entry method + stored_error = timeseries_db.get_first_entry( + key="stats/dashboard_error", + field="data.ts", + sort_order=pymongo.DESCENDING, + time_query=time_query ) + if stored_error: stored_ts = stored_error.get("data", {}).get("ts", 0) stored_reading = stored_error.get("data", {}).get("reading", 0) @@ -148,7 +156,7 @@ def main(): function_list: t.List[t.Callable[[], bool]] = [ test_function_1, test_function_2, - # test_function_faulty, # This will raise an exception + test_function_faulty, # This will raise an exception test_function_3 # This should execute normally after the faulty function ] # Execute and time each function