Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Oct 15, 2024
1 parent d02f186 commit b6bb6b9
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 40 deletions.
52 changes: 40 additions & 12 deletions emission/storage/decorations/stats_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import time
# Standard imports
from future import standard_library
standard_library.install_aliases()
Expand All @@ -10,6 +11,7 @@
# Our imports
import emission.storage.timeseries.abstract_timeseries as esta
import emission.core.wrapper.entry as ecwe
import emission.core.timer as ec_timer


# metadata format is
Expand Down Expand Up @@ -44,21 +46,47 @@ def store_stats_entry(user_id, metadata_key, name, ts, reading):
new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data)
return esta.TimeSeries.get_time_series(user_id).insert(new_entry)

def store_dashboard_time(code_fragment_name: str, ts: float, reading: float):
def store_dashboard_time(code_fragment_name: str, timer: ec_timer.Timer):
"""
Stores statistics about execution times in dashboard code. Both of our current dashboards generate _aggregate_ metrics. I don't see that changing in the foreseeable future, since we don't really want to work at a per-user level in the python dashboards. So we don't pass in the user_id, only a string indicating the name of the step being instrumented, and the value.
Stores statistics about execution times in dashboard code using a Timer object.
Both of our current dashboards generate _aggregate_ metrics. We do not work at a per-user level
in the Python dashboards, so we pass in only the name of the step being instrumented and the timing information.
Parameters:
- code_fragment_name (str): The name of the function being timed.
- ts (float): The timestamp when the function execution started.
- reading (float): The duration of the function execution in milliseconds.
Returns:
- InsertResult: The result of the insert operation.
- code_fragment_name (str): The name of the function or code fragment being timed.
- timer (ec_timer.Timer): The Timer object that records the execution duration.
"""
return store_stats_entry(None, "stats/dashboard_time", code_fragment_name, ts, reading)
# Extract the elapsed time in seconds and convert to milliseconds
elapsed_seconds = timer.elapsed # Access the elapsed time in seconds
elapsed_ms = elapsed_seconds * 1000 # Convert to milliseconds

# Get the current timestamp in seconds since epoch
timestamp = time.time()

# Call the existing store_stats_entry function
store_stats_entry(
user_id=None, # No user ID as per current dashboard design
metadata_key="stats/dashboard_time",
name=code_fragment_name,
ts=timestamp,
reading=elapsed_ms
)


def store_dashboard_error(code_fragment_name: str, timer: ec_timer.Timer):
# Extract the elapsed time in seconds and convert to milliseconds
elapsed_seconds = timer.elapsed # Access the elapsed time in seconds
elapsed_ms = elapsed_seconds * 1000 # Convert to milliseconds

# Get the current timestamp in seconds since epoch
timestamp = time.time()

def store_dashboard_error(code_fragment_name: str, ts: float, reading: float):
store_stats_entry(None, "stats/dashboard_error", code_fragment_name, ts, reading)
# Call the existing store_stats_entry function
store_stats_entry(
user_id=None, # No user ID as per current dashboard design
metadata_key="stats/dashboard_error",
name=code_fragment_name,
ts=timestamp,
reading=elapsed_ms
)

37 changes: 9 additions & 28 deletions emission/tests/funcTests/TestFunctionTiming.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def execute_and_time_function(func: t.Callable[[], bool]):
- func (Callable[[], bool]): The test function to execute and time.
"""
function_name = func.__name__
timestamp = time.time()

logging.info(f"Starting timing for function: {function_name}")

Expand All @@ -61,8 +60,7 @@ def execute_and_time_function(func: t.Callable[[], bool]):
# Store the execution time
sdq.store_dashboard_time(
code_fragment_name=function_name,
ts=timestamp,
reading=elapsed_ms
timer=timer
)
print(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.")
logging.info(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.")
Expand All @@ -71,25 +69,17 @@ def execute_and_time_function(func: t.Callable[[], bool]):
timeseries_db = esta.TimeSeries.get_time_series(None)

# Retrieve the document
stored_document = timeseries_db.get_entry_at_ts(
key="stats/dashboard_time",
ts_key="data.ts",
ts=timestamp
stored_document = timeseries_db.get_first_entry(
key=f"stats/dashboard_time",
field="data.ts",
sort_order=pymongo.DESCENDING,
)

if stored_document:
# Inspect the stored document
stored_ts = stored_document.get("data", {}).get("ts", 0)
stored_reading = stored_document.get("data", {}).get("reading", 0)
logging.debug(f"Stored Document for '{function_name}': ts={stored_ts}, reading={stored_reading}")

# Check if the reading is within a reasonable tolerance (e.g., ±100 ms)
if abs(stored_reading - elapsed_ms) <= 100:
print(f"Verification passed: Data for '{function_name}' is stored correctly.")
logging.info(f"Verification passed: Data for '{function_name}' is stored correctly.")
else:
print(f"Verification failed: 'reading' value for '{function_name}' is outside the expected range.")
logging.error(f"Verification failed: 'reading' value for '{function_name}' is outside the expected range.")
else:
print(f"Verification failed: Data for '{function_name}' was not found in the database.")
logging.error(f"Verification failed: Data for '{function_name}' was not found in the database.")
Expand All @@ -102,8 +92,7 @@ def execute_and_time_function(func: t.Callable[[], bool]):
# Store the error timing
sdq.store_dashboard_error(
code_fragment_name=function_name,
ts=timestamp,
reading=elapsed_ms
timer=timer
)
print(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}")
logging.error(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}")
Expand All @@ -113,24 +102,16 @@ def execute_and_time_function(func: t.Callable[[], bool]):
timeseries_db = esta.TimeSeries.get_time_series(None)

# Retrieve the document
stored_error = timeseries_db.get_entry_at_ts(
stored_error = timeseries_db.get_first_entry(
key="stats/dashboard_error",
ts_key="data.ts",
ts=timestamp
field="data.ts",
sort_order=pymongo.DESCENDING,
)


if stored_error:
stored_ts = stored_error.get("data", {}).get("ts", 0)
stored_reading = stored_error.get("data", {}).get("reading", 0)
logging.debug(f"Stored Error Document for '{function_name}': ts={stored_ts}, reading={stored_reading}")

if abs(stored_reading - elapsed_ms) <= 100:
print(f"Error verification passed: Error for '{function_name}' is stored correctly.")
logging.info(f"Error verification passed: Error for '{function_name}' is stored correctly.")
else:
print(f"Error verification failed: 'reading' value for '{function_name}' error is outside the expected range.")
logging.error(f"Error verification failed: 'reading' value for '{function_name}' error is outside the expected range.")
else:
print(f"Error verification failed: Error for '{function_name}' was not found in the database.")
logging.error(f"Error verification failed: Error for '{function_name}' was not found in the database.")
Expand Down

0 comments on commit b6bb6b9

Please sign in to comment.