Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats Timing #986

Merged
merged 42 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
acf7f3e
Typo Fix
TeachMeTW Oct 2, 2024
553a45d
Added Func Time Collection?
TeachMeTW Oct 4, 2024
d19392d
Fixed per feedback, removed decorator
TeachMeTW Oct 5, 2024
05bac58
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
e23d4b6
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
7fc3f81
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
e7e50da
Update emission/storage/timeseries/builtin_timeseries.py
TeachMeTW Oct 12, 2024
dd253dd
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
8957a06
Review Changes
TeachMeTW Oct 12, 2024
f428069
Changes
TeachMeTW Oct 13, 2024
4f636a9
Reverted some
TeachMeTW Oct 14, 2024
b2e3cb5
Added verification
TeachMeTW Oct 14, 2024
a1ff54e
resolve
TeachMeTW Oct 14, 2024
12d5e86
Fixed whitespace commits
TeachMeTW Oct 14, 2024
cf5b5a1
Removed unused imports
TeachMeTW Oct 14, 2024
2b5cefd
Removed unused imports
TeachMeTW Oct 14, 2024
5ac74fa
Delete miniconda.sh
TeachMeTW Oct 14, 2024
64cfc3f
Removed unnecessary
TeachMeTW Oct 14, 2024
a7884e5
Merge branch 'feature/stats_timing' of https://github.com/TeachMeTW/e…
TeachMeTW Oct 14, 2024
196e410
Timeseries
TeachMeTW Oct 14, 2024
fc3f0ee
Used TimeSeries Interface and made it so _get_query accepts dict and …
TeachMeTW Oct 15, 2024
d02f186
Used get entry at ts
TeachMeTW Oct 15, 2024
b6bb6b9
Refactor
TeachMeTW Oct 15, 2024
bc9f638
Turned into unit test
TeachMeTW Oct 15, 2024
f814102
Wrong comment
TeachMeTW Oct 15, 2024
5066c85
Update .gitignore
TeachMeTW Oct 15, 2024
9996707
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 15, 2024
fed4964
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 15, 2024
09f93df
remvoed whitespace
TeachMeTW Oct 15, 2024
9c418c8
Used find_entries
TeachMeTW Oct 15, 2024
5ba94d7
Jack Changes
TeachMeTW Oct 15, 2024
7f12f67
Reverts
TeachMeTW Oct 16, 2024
4c7b80b
Reverted Non_user
TeachMeTW Oct 16, 2024
e843169
Added more tests
TeachMeTW Oct 16, 2024
9f9ec9e
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 16, 2024
d76f63a
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 16, 2024
f5b07a4
Implemented Review Changes
TeachMeTW Oct 16, 2024
a24b03b
simplify tests for store_dashboard_time / store_dashboard_error
JGreenlee Oct 17, 2024
39f763d
move TestFunctionTiming -> TestStatsQueries
JGreenlee Oct 17, 2024
1bdc922
Merge pull request #1 from JGreenlee/simpler_stats_timing
TeachMeTW Oct 17, 2024
db1e7bb
Added comments and changed param
TeachMeTW Oct 17, 2024
0b39803
Toned back comments
TeachMeTW Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ def _getData2Wrapper():
"stats/server_api_error": "statsevent",
# pipeline stage time, measured on the server
"stats/pipeline_time": "statsevent",
# dashboard time, measured on the server
"stats/dashboard_time": "statsevent",
# intended to log the occurrence of errors in the pipeline
"stats/pipeline_error": "statsevent",
# intended to log the occurrence of errors in the dashboard
"stats/dashboard_error": "statsevent",
# time for various client operations, measured on the client
# comparison with the server_api_time can help debug networking issues
"stats/client_time": "statsevent",
Expand Down
2 changes: 1 addition & 1 deletion emission/pipeline/intake_stage.py
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
eaum.match_incoming_user_inputs(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.name,
time.time(), uct.elapsed)
time.time(), uit.elapsed)

# Hack until we delete these spurious entries
# https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868
Expand Down
45 changes: 43 additions & 2 deletions emission/storage/decorations/stats_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import time
# Standard imports
from future import standard_library
standard_library.install_aliases()
from builtins import *
import logging
import time

# Our imports
import emission.storage.timeseries.abstract_timeseries as esta
import emission.core.wrapper.entry as ecwe
import emission.core.timer as ec_timer


# metadata format is
Expand Down Expand Up @@ -46,3 +46,44 @@ def store_stats_entry(user_id, metadata_key, name, ts, reading):
new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data)
return esta.TimeSeries.get_time_series(user_id).insert(new_entry)
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved

def store_dashboard_time(code_fragment_name: str, timer: ec_timer.Timer):
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
"""
Stores statistics about execution times in dashboard code using a Timer object.
Both of our current dashboards generate _aggregate_ metrics. We do not work at a per-user level
in the Python dashboards, so we pass in only the name of the step being instrumented and the timing information.

Parameters:
- code_fragment_name (str): The name of the function or code fragment being timed.
- timer (ec_timer.Timer): The Timer object that records the execution duration.
"""
# Extract the elapsed time in seconds and convert to milliseconds
elapsed_ms = timer.elapsed * 1000 # Convert to milliseconds

# Get the current timestamp in seconds since epoch
timestamp = time.time()

# Call the existing store_stats_entry function
store_stats_entry(
user_id=None, # No user ID as per current dashboard design
metadata_key="stats/dashboard_time",
name=code_fragment_name,
ts=timestamp,
reading=elapsed_ms
)


def store_dashboard_error(code_fragment_name: str, timer: ec_timer.Timer):
elapsed_ms = timer.elapsed * 1000

# Get the current timestamp in seconds since epoch
timestamp = time.time()

# Call the existing store_stats_entry function
store_stats_entry(
user_id=None, # No user ID as per current dashboard design
metadata_key="stats/dashboard_error",
name=code_fragment_name,
ts=timestamp,
reading=elapsed_ms
)

2 changes: 2 additions & 0 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__(self, user_id):
"stats/server_api_time": self.timeseries_db,
"stats/server_api_error": self.timeseries_db,
"stats/pipeline_time": self.timeseries_db,
"stats/dashboard_time": self.timeseries_db,
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
"stats/dashboard_error": self.timeseries_db,
"stats/pipeline_error": self.timeseries_db,
"stats/client_time": self.timeseries_db,
"stats/client_nav_event": self.timeseries_db,
Expand Down
3 changes: 2 additions & 1 deletion emission/storage/timeseries/non_user_timeseries.py
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

import emission.core.get_database as edb
import emission.storage.timeseries.builtin_timeseries as bits
import emission.core.wrapper.entry as ecwe

class NonUserTimeSeries(bits.BuiltinTimeSeries):
def __init__(self):
super(AggregateTimeSeries, self).__init__(None)
super(ecwe.AggregateTimeSeries, self).__init__(None)
self.user_query = {}
self.timeseries_db = edb.get_non_user_timeseries_db()

Expand Down
223 changes: 223 additions & 0 deletions emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
# emission/tests/funcTests/TestFunctionTiming.py

import unittest
import logging
import time
import typing as t
import pymongo

# Import the store_dashboard_time and store_dashboard_error functions
import emission.storage.decorations.stats_queries as sdq

# Import the existing Timer context manager
import emission.core.timer as ec_timer

# Import the database module for verification
import emission.storage.timeseries.abstract_timeseries as esta
import emission.core.get_database as edb

class TestFunctionTiming(unittest.TestCase):
@classmethod
def setUpClass(self):
"""
Set up resources before any tests are run.
"""
logging.basicConfig(level=logging.INFO)
self.timeseries_db = esta.TimeSeries.get_time_series(None)

def tearDown(self):
"""
Clean up relevant database entries after each test to maintain isolation.
"""
# Define the metadata keys to clean
keys_to_clean = ["stats/dashboard_time", "stats/dashboard_error"]
try:
# Delete entries where metadata.key is in keys_to_clean from timeseries_db
edb.get_timeseries_db().delete_many({"metadata.key": {"$in": keys_to_clean}})
logging.info("Cleaned up database entries after test based on metadata keys.")
except Exception as e:
logging.error(f"Error cleaning up database entries: {e}")
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are tearing down the database entries after every test, you will never end up with two entries, and will not be able to verify that the entries don't override each other.

You need to have a test that invokes multiple functions.

We are also not instrumenting only in chunks of one function at a time - we also want to instrument code blocks. Where is the test for that?



def test_function_no_delay(self):
"""
Test execution and timing of test_function_1.
"""
self.execute_and_time_function(test_function_no_delay)

def test_function_short_delay(self):
"""
Test execution and timing of test_function_2.
"""
self.execute_and_time_function(test_function_short_delay)

def test_function_long_delay(self):
"""
Test execution and timing of test_function_3.
"""
self.execute_and_time_function(test_function_long_delay)

def test_function_faulty(self):
"""
Test execution and timing of test_function_faulty, which is expected to raise an exception.
"""
with self.assertRaises(ValueError) as context:
self.execute_and_time_function(test_function_faulty)
self.assertIn("Simulated error in test_function_faulty", str(context.exception))
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved

def execute_and_time_function(self, func: t.Callable[[], bool]):
"""
Executes a given function, measures its execution time using ECT_Timer,
stores the timing information using store_dashboard_time, and verifies
that the data was stored successfully by querying the timeseries database.
If the function raises an exception, it stores the error using store_dashboard_error
and verifies the error storage.

Parameters:
- func (Callable[[], bool]): The test function to execute and time.
"""
function_name = func.__name__
logging.info(f"Starting timing for function: {function_name}")

try:
with ec_timer.Timer() as timer:
result = func() # Execute the test function

elapsed_ms = (timer.elapsed * 1000) # Convert to milliseconds

# Store the execution time
sdq.store_dashboard_time(
code_fragment_name=function_name,
timer=timer
)
logging.info(f"Function '{function_name}' executed successfully in {elapsed_ms} ms.")

# Verification: Adjusted Query to Match Document Structure
stored_documents_chain = self.timeseries_db.find_entries(["stats/dashboard_time"], time_query=None)

# Convert the chain to a list to make it subscriptable and to allow multiple accesses
stored_document = list(stored_documents_chain)

# Assert that at least one document was retrieved
self.assertTrue(
len(stored_document) > 0,
f"Data for '{function_name}' was not found in the database."
)
stored_document = stored_document[0]
# Iterate over each document and inspect its contents
try:
stored_ts = stored_document['data']['ts']
stored_reading = stored_document['data']['reading']
stored_name = stored_document['data']['name']
logging.debug(
f"Stored Document for '{function_name}': ts={stored_ts}, reading={stored_reading}, name={stored_name}"
)
except KeyError as e:
self.fail(
f"Missing key {e} in stored document for '{function_name}'."
)

# Assert that the stored_reading_error matches elapsed_ms exactly
self.assertEqual(
stored_reading,
elapsed_ms,
msg=(
f"Stored reading {stored_reading} ms does not exactly match "
f"elapsed time {elapsed_ms:.2f} ms for '{function_name}'."
)
)

# Assert that the stored_name_error matches function_name exactly
self.assertEqual(
stored_name,
function_name,
msg=(
f"Stored name '{stored_name}' does not match function "
f"name '{function_name}'."
)
)

except Exception as e:
# Even if the function fails, capture the elapsed time up to the exception
elapsed_seconds = timer.elapsed if 'timer' in locals() else 0 # Accessing the float attribute directly
elapsed_ms = (elapsed_seconds * 1000) # Convert to milliseconds

# Store the error timing
sdq.store_dashboard_error(
code_fragment_name=function_name,
timer=timer
)
logging.error(f"Function '{function_name}' failed after {elapsed_ms} ms with error: {e}")

# Verification: Adjusted Error Query to Match Document Structure
stored_error_chain = self.timeseries_db.find_entries(["stats/dashboard_error"], time_query=None)

# Convert the chain to a list to make it subscriptable and to allow multiple accesses
stored_error = list(stored_error_chain)

# Assert that at least one document was retrieved
self.assertTrue(
len(stored_error) > 0,
f"Data for '{function_name}' was not found in the database."
)
stored_error = stored_error[0]
# Iterate over each document and inspect its contents
try:
stored_ts_error = stored_error['data']['ts']
stored_reading_error = stored_error['data']['reading']
stored_name_error = stored_error['data']['name']
logging.debug(
f"Stored Document for '{function_name}': ts={stored_ts_error}, reading={stored_reading_error}, name={stored_name_error}"
)
except KeyError as e:
self.fail(
f"Missing key {e} in stored document for '{function_name}'."
)
# Assert that the stored_reading_error matches elapsed_ms exactly
self.assertEqual(
stored_reading_error,
elapsed_ms,
msg=(
f"Stored error reading {stored_reading_error} ms does not exactly match "
f"elapsed time {elapsed_ms:.2f} ms for '{function_name}'."
)
)

# Assert that the stored_name_error matches function_name exactly
self.assertEqual(
stored_name_error,
function_name,
msg=(
f"Stored error name '{stored_name_error}' does not match function "
f"name '{function_name}'."
)
)

# Re-raise the exception to let the test fail
raise


# Define the test functions outside the TestCase class
def test_function_no_delay():
logging.info("Executing test_function_no_delay")
return True

def test_function_short_delay():
logging.info("Executing test_function_short_delay")
time.sleep(1) # Simulate processing time
return True # Indicate successful execution

def test_function_long_delay():
logging.info("Executing test_function_long_delay")
time.sleep(5)
return True

def test_function_faulty():
logging.info("Executing test_function_faulty")
time.sleep(1)
raise ValueError("Simulated error in test_function_faulty")


if __name__ == "__main__":
unittest.main()
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Loading