Skip to content

Commit

Permalink
Used TimeSeries Interface and made it so _get_query accepts dict and …
Browse files Browse the repository at this point in the history
…objects
  • Loading branch information
TeachMeTW committed Oct 15, 2024
1 parent 196e410 commit fc3f0ee
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 39 deletions.
61 changes: 43 additions & 18 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,28 +130,53 @@ def _get_query(self, key_list = None, time_query = None, geo_query = None,
:return:
"""
ret_query = {"invalid": {"$exists": False}}
ret_query.update(self.user_query)
if key_list is not None and len(key_list) > 0:
key_query_list = []
for key in key_list:
key_query_list.append(self.key_query(key))
ret_query.update({"$or": key_query_list})
if time_query is not None:
ret_query.update(time_query.get_query())
if geo_query is not None:
ret_query.update(geo_query.get_query())
if extra_query_list is not None:

# Update with user-specific queries
if hasattr(self, 'user_query') and isinstance(self.user_query, dict):
ret_query.update(self.user_query)
elif hasattr(self, 'user_query'):
logging.warning("user_query is not a dict and will not be used.")

# Handle key_list
if key_list:
key_query_list = [self.key_query(key) for key in key_list]
ret_query["$or"] = key_query_list

# Handle time_query
if time_query:
if hasattr(time_query, 'get_query') and callable(time_query.get_query):
time_query_dict = time_query.get_query()
ret_query.update(time_query_dict)
elif isinstance(time_query, dict):
ret_query.update(time_query)
else:
raise TypeError("time_query must have a get_query method or be a dict")

# Handle geo_query
if geo_query:
if hasattr(geo_query, 'get_query') and callable(geo_query.get_query):
geo_query_dict = geo_query.get_query()
ret_query.update(geo_query_dict)
elif isinstance(geo_query, dict):
ret_query.update(geo_query)
else:
raise TypeError("geo_query must have a get_query method or be a dict")

# Handle extra_query_list
if extra_query_list:
for extra_query in extra_query_list:
if not isinstance(extra_query, dict):
raise TypeError("Each extra_query must be a dict")
eq_keys = set(extra_query.keys())
curr_keys = set(ret_query.keys())
overlap_keys = eq_keys.intersection(curr_keys)
if len(overlap_keys) != 0:
logging.info("eq_keys = %s, curr_keys = %s, overlap_keys = %s" %
(eq_keys, curr_keys, overlap_keys))
raise AttributeError("extra query would overwrite keys %s" %
list(overlap_keys))
else:
ret_query.update(extra_query)
if overlap_keys:
logging.info(
f"eq_keys = {eq_keys}, curr_keys = {curr_keys}, overlap_keys = {overlap_keys}"
)
raise AttributeError(f"extra query would overwrite keys {list(overlap_keys)}")
ret_query.update(extra_query)

return ret_query

def _get_sort_key(self, time_query = None):
Expand Down
50 changes: 29 additions & 21 deletions emission/tests/funcTests/TestFunctionTiming.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import time
import typing as t
import pymongo

# Import the store_dashboard_time and store_dashboard_error functions
import emission.storage.decorations.stats_queries as sdq
Expand All @@ -11,8 +12,8 @@
import emission.core.timer as ec_timer

# Import the database module for verification
import emission.storage.timeseries.builtin_timeseries as bits
builtin_ts = bits.BuiltinTimeSeries(None)
import emission.storage.timeseries.abstract_timeseries as esta

# Define test functions
def test_function_1():
logging.info("Executing test_function_1")
Expand Down Expand Up @@ -67,20 +68,21 @@ def execute_and_time_function(func: t.Callable[[], bool]):
logging.info(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.")

# Verification: Adjusted Query to Match Document Structure
timeseries_db = builtin_ts.get_timeseries_db(key="stats/dashboard_time")

timeseries_db = esta.TimeSeries.get_time_series(None)

query = {
"metadata.key": "stats/dashboard_time",
"data.name": function_name,
# Define the time range and additional filters
time_query = {
"data.ts": {"$gte": timestamp, "$lte": timestamp},
"data.reading": {"$gte": elapsed_ms, "$lte": elapsed_ms}
"data.name": function_name,
"data.reading": {"$gte": elapsed_ms, "$lte": elapsed_ms}
}

# Retrieve the most recent document for the function
stored_document = timeseries_db.find_one(
query,
sort=[("data.ts", -1)]

# Retrieve the first matching entry using the get_first_entry method
stored_document = timeseries_db.get_first_entry(
key="stats/dashboard_time",
field="data.ts",
sort_order=pymongo.DESCENDING,
time_query=time_query
)

if stored_document:
Expand Down Expand Up @@ -115,19 +117,25 @@ def execute_and_time_function(func: t.Callable[[], bool]):
logging.error(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}")

# Verification: Adjusted Error Query to Match Document Structure
timeseries_db = builtin_ts.get_timeseries_db(key="stats/dashboard_error")
# Initialize the TimeSeries database connection
timeseries_db = esta.TimeSeries.get_time_series(None)

error_query = {
"metadata.key": "stats/dashboard_error",
"data.name": function_name,
# Define the time range and additional filters
time_query = {
"data.ts": {"$gte": timestamp, "$lte": timestamp},
"data.name": function_name,
"data.reading": {"$gte": elapsed_ms, "$lte": elapsed_ms}
}
stored_error = timeseries_db.find_one(
error_query,
sort=[("data.ts", -1)]

# Retrieve the first matching entry using the get_first_entry method
stored_error = timeseries_db.get_first_entry(
key="stats/dashboard_error",
field="data.ts",
sort_order=pymongo.DESCENDING,
time_query=time_query
)


if stored_error:
stored_ts = stored_error.get("data", {}).get("ts", 0)
stored_reading = stored_error.get("data", {}).get("reading", 0)
Expand All @@ -148,7 +156,7 @@ def main():
function_list: t.List[t.Callable[[], bool]] = [
test_function_1,
test_function_2,
# test_function_faulty, # This will raise an exception
test_function_faulty, # This will raise an exception
test_function_3 # This should execute normally after the faulty function
]
# Execute and time each function
Expand Down

0 comments on commit fc3f0ee

Please sign in to comment.