From b6bb6b913c47294ae791bc3fefe3fe2173a5289d Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Tue, 15 Oct 2024 10:48:16 -0700 Subject: [PATCH] Refactor --- emission/storage/decorations/stats_queries.py | 52 ++++++++++++++----- .../tests/funcTests/TestFunctionTiming.py | 37 ++++--------- 2 files changed, 49 insertions(+), 40 deletions(-) diff --git a/emission/storage/decorations/stats_queries.py b/emission/storage/decorations/stats_queries.py index 6ca10d28a..c030ad569 100644 --- a/emission/storage/decorations/stats_queries.py +++ b/emission/storage/decorations/stats_queries.py @@ -2,6 +2,7 @@ from __future__ import print_function from __future__ import division from __future__ import absolute_import +import time # Standard imports from future import standard_library standard_library.install_aliases() @@ -10,6 +11,7 @@ # Our imports import emission.storage.timeseries.abstract_timeseries as esta import emission.core.wrapper.entry as ecwe +import emission.core.timer as ec_timer # metadata format is @@ -44,21 +46,47 @@ def store_stats_entry(user_id, metadata_key, name, ts, reading): new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data) return esta.TimeSeries.get_time_series(user_id).insert(new_entry) -def store_dashboard_time(code_fragment_name: str, ts: float, reading: float): +def store_dashboard_time(code_fragment_name: str, timer: ec_timer.Timer): """ - Stores statistics about execution times in dashboard code. Both of our current dashboards generate _aggregate_ metrics. I don't see that changing in the foreseeable future, since we don't really want to work at a per-user level in the python dashboards. So we don't pass in the user_id, only a string indicating the name of the step being instrumented, and the value. - + Stores statistics about execution times in dashboard code using a Timer object. + Both of our current dashboards generate _aggregate_ metrics. We do not work at a per-user level + in the Python dashboards, so we pass in only the name of the step being instrumented and the timing information. + Parameters: - - code_fragment_name (str): The name of the function being timed. - - ts (float): The timestamp when the function execution started. - - reading (float): The duration of the function execution in milliseconds. - - Returns: - - InsertResult: The result of the insert operation. + - code_fragment_name (str): The name of the function or code fragment being timed. + - timer (ec_timer.Timer): The Timer object that records the execution duration. """ - return store_stats_entry(None, "stats/dashboard_time", code_fragment_name, ts, reading) + # Extract the elapsed time in seconds and convert to milliseconds + elapsed_seconds = timer.elapsed # Access the elapsed time in seconds + elapsed_ms = elapsed_seconds * 1000 # Convert to milliseconds + + # Get the current timestamp in seconds since epoch + timestamp = time.time() + + # Call the existing store_stats_entry function + store_stats_entry( + user_id=None, # No user ID as per current dashboard design + metadata_key="stats/dashboard_time", + name=code_fragment_name, + ts=timestamp, + reading=elapsed_ms + ) + + +def store_dashboard_error(code_fragment_name: str, timer: ec_timer.Timer): + # Extract the elapsed time in seconds and convert to milliseconds + elapsed_seconds = timer.elapsed # Access the elapsed time in seconds + elapsed_ms = elapsed_seconds * 1000 # Convert to milliseconds + # Get the current timestamp in seconds since epoch + timestamp = time.time() -def store_dashboard_error(code_fragment_name: str, ts: float, reading: float): - store_stats_entry(None, "stats/dashboard_error", code_fragment_name, ts, reading) + # Call the existing store_stats_entry function + store_stats_entry( + user_id=None, # No user ID as per current dashboard design + metadata_key="stats/dashboard_error", + name=code_fragment_name, + ts=timestamp, + reading=elapsed_ms + ) diff --git a/emission/tests/funcTests/TestFunctionTiming.py b/emission/tests/funcTests/TestFunctionTiming.py index 534eafefe..abf533d82 100644 --- a/emission/tests/funcTests/TestFunctionTiming.py +++ b/emission/tests/funcTests/TestFunctionTiming.py @@ -47,7 +47,6 @@ def execute_and_time_function(func: t.Callable[[], bool]): - func (Callable[[], bool]): The test function to execute and time. """ function_name = func.__name__ - timestamp = time.time() logging.info(f"Starting timing for function: {function_name}") @@ -61,8 +60,7 @@ def execute_and_time_function(func: t.Callable[[], bool]): # Store the execution time sdq.store_dashboard_time( code_fragment_name=function_name, - ts=timestamp, - reading=elapsed_ms + timer=timer ) print(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.") logging.info(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.") @@ -71,10 +69,10 @@ def execute_and_time_function(func: t.Callable[[], bool]): timeseries_db = esta.TimeSeries.get_time_series(None) # Retrieve the document - stored_document = timeseries_db.get_entry_at_ts( - key="stats/dashboard_time", - ts_key="data.ts", - ts=timestamp + stored_document = timeseries_db.get_first_entry( + key=f"stats/dashboard_time", + field="data.ts", + sort_order=pymongo.DESCENDING, ) if stored_document: @@ -82,14 +80,6 @@ def execute_and_time_function(func: t.Callable[[], bool]): stored_ts = stored_document.get("data", {}).get("ts", 0) stored_reading = stored_document.get("data", {}).get("reading", 0) logging.debug(f"Stored Document for '{function_name}': ts={stored_ts}, reading={stored_reading}") - - # Check if the reading is within a reasonable tolerance (e.g., ±100 ms) - if abs(stored_reading - elapsed_ms) <= 100: - print(f"Verification passed: Data for '{function_name}' is stored correctly.") - logging.info(f"Verification passed: Data for '{function_name}' is stored correctly.") - else: - print(f"Verification failed: 'reading' value for '{function_name}' is outside the expected range.") - logging.error(f"Verification failed: 'reading' value for '{function_name}' is outside the expected range.") else: print(f"Verification failed: Data for '{function_name}' was not found in the database.") logging.error(f"Verification failed: Data for '{function_name}' was not found in the database.") @@ -102,8 +92,7 @@ def execute_and_time_function(func: t.Callable[[], bool]): # Store the error timing sdq.store_dashboard_error( code_fragment_name=function_name, - ts=timestamp, - reading=elapsed_ms + timer=timer ) print(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}") logging.error(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}") @@ -113,24 +102,16 @@ def execute_and_time_function(func: t.Callable[[], bool]): timeseries_db = esta.TimeSeries.get_time_series(None) # Retrieve the document - stored_error = timeseries_db.get_entry_at_ts( + stored_error = timeseries_db.get_first_entry( key="stats/dashboard_error", - ts_key="data.ts", - ts=timestamp + field="data.ts", + sort_order=pymongo.DESCENDING, ) - if stored_error: stored_ts = stored_error.get("data", {}).get("ts", 0) stored_reading = stored_error.get("data", {}).get("reading", 0) logging.debug(f"Stored Error Document for '{function_name}': ts={stored_ts}, reading={stored_reading}") - - if abs(stored_reading - elapsed_ms) <= 100: - print(f"Error verification passed: Error for '{function_name}' is stored correctly.") - logging.info(f"Error verification passed: Error for '{function_name}' is stored correctly.") - else: - print(f"Error verification failed: 'reading' value for '{function_name}' error is outside the expected range.") - logging.error(f"Error verification failed: 'reading' value for '{function_name}' error is outside the expected range.") else: print(f"Error verification failed: Error for '{function_name}' was not found in the database.") logging.error(f"Error verification failed: Error for '{function_name}' was not found in the database.")