Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats Timing #986

Merged
merged 42 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
acf7f3e
Typo Fix
TeachMeTW Oct 2, 2024
553a45d
Added Func Time Collection?
TeachMeTW Oct 4, 2024
d19392d
Fixed per feedback, removed decorator
TeachMeTW Oct 5, 2024
05bac58
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
e23d4b6
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
7fc3f81
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
e7e50da
Update emission/storage/timeseries/builtin_timeseries.py
TeachMeTW Oct 12, 2024
dd253dd
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
8957a06
Review Changes
TeachMeTW Oct 12, 2024
f428069
Changes
TeachMeTW Oct 13, 2024
4f636a9
Reverted some
TeachMeTW Oct 14, 2024
b2e3cb5
Added verification
TeachMeTW Oct 14, 2024
a1ff54e
resolve
TeachMeTW Oct 14, 2024
12d5e86
Fixed whitespace commits
TeachMeTW Oct 14, 2024
cf5b5a1
Removed unused imports
TeachMeTW Oct 14, 2024
2b5cefd
Removed unused imports
TeachMeTW Oct 14, 2024
5ac74fa
Delete miniconda.sh
TeachMeTW Oct 14, 2024
64cfc3f
Removed unnecessary
TeachMeTW Oct 14, 2024
a7884e5
Merge branch 'feature/stats_timing' of https://github.com/TeachMeTW/e…
TeachMeTW Oct 14, 2024
196e410
Timeseries
TeachMeTW Oct 14, 2024
fc3f0ee
Used TimeSeries Interface and made it so _get_query accepts dict and …
TeachMeTW Oct 15, 2024
d02f186
Used get entry at ts
TeachMeTW Oct 15, 2024
b6bb6b9
Refactor
TeachMeTW Oct 15, 2024
bc9f638
Turned into unit test
TeachMeTW Oct 15, 2024
f814102
Wrong comment
TeachMeTW Oct 15, 2024
5066c85
Update .gitignore
TeachMeTW Oct 15, 2024
9996707
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 15, 2024
fed4964
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 15, 2024
09f93df
remvoed whitespace
TeachMeTW Oct 15, 2024
9c418c8
Used find_entries
TeachMeTW Oct 15, 2024
5ba94d7
Jack Changes
TeachMeTW Oct 15, 2024
7f12f67
Reverts
TeachMeTW Oct 16, 2024
4c7b80b
Reverted Non_user
TeachMeTW Oct 16, 2024
e843169
Added more tests
TeachMeTW Oct 16, 2024
9f9ec9e
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 16, 2024
d76f63a
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 16, 2024
f5b07a4
Implemented Review Changes
TeachMeTW Oct 16, 2024
a24b03b
simplify tests for store_dashboard_time / store_dashboard_error
JGreenlee Oct 17, 2024
39f763d
move TestFunctionTiming -> TestStatsQueries
JGreenlee Oct 17, 2024
1bdc922
Merge pull request #1 from JGreenlee/simpler_stats_timing
TeachMeTW Oct 17, 2024
db1e7bb
Added comments and changed param
TeachMeTW Oct 17, 2024
0b39803
Toned back comments
TeachMeTW Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ def _getData2Wrapper():
"stats/server_api_error": "statsevent",
# pipeline stage time, measured on the server
"stats/pipeline_time": "statsevent",
# dashboard time, measured on the server
"stats/dashboard_time": "statsevent",
# intended to log the occurrence of errors in the pipeline
"stats/pipeline_error": "statsevent",
# intended to log the occurrence of errors in the dashboard
"stats/dashboard_error": "statsevent",
# time for various client operations, measured on the client
# comparison with the server_api_time can help debug networking issues
"stats/client_time": "statsevent",
Expand Down
2 changes: 1 addition & 1 deletion emission/pipeline/intake_stage.py
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
eaum.match_incoming_user_inputs(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.name,
time.time(), uct.elapsed)
time.time(), uit.elapsed)

# Hack until we delete these spurious entries
# https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868
Expand Down
22 changes: 21 additions & 1 deletion emission/storage/decorations/stats_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from builtins import *
import logging
import time
from functools import wraps
from typing import Callable, Any

# Our imports
import emission.storage.timeseries.abstract_timeseries as esta
Expand Down Expand Up @@ -44,5 +46,23 @@ def store_stats_entry(user_id, metadata_key, name, ts, reading):
"reading": reading
}
new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data)
return esta.TimeSeries.get_time_series(user_id).insert(new_entry)
return esta.TimeSeries.get_non_user_time_series().insert(new_entry)
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved

def store_dashboard_time(code_fragment_name: str, ts: float, reading: float):
"""
Stores statistics about execution times in dashboard code. Both of our current dashboards generate _aggregate_ metrics. I don't see that changing in the foreseeable future, since we don't really want to work at a per-user level in the python dashboards. So we don't pass in the user_id, only a string indicating the name of the step being instrumented, and the value.

Parameters:
- code_fragment_name (str): The name of the function being timed.
- ts (float): The timestamp when the function execution started.
- reading (float): The duration of the function execution in milliseconds.

Returns:
- InsertResult: The result of the insert operation.
"""
return store_stats_entry(None, "stats/dashboard_time", code_fragment_name, ts, reading)


def store_dashboard_error(code_fragment_name: str, ts: float, reading: float):
store_stats_entry(None, "stats/dashboard_error", code_fragment_name, ts, reading)

1 change: 1 addition & 0 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, user_id):
"stats/server_api_time": self.timeseries_db,
"stats/server_api_error": self.timeseries_db,
"stats/pipeline_time": self.timeseries_db,
"stats/dashboard_time": self.timeseries_db,
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
"stats/pipeline_error": self.timeseries_db,
"stats/client_time": self.timeseries_db,
"stats/client_nav_event": self.timeseries_db,
Expand Down
51 changes: 32 additions & 19 deletions emission/storage/timeseries/non_user_timeseries.py
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

import emission.core.get_database as edb
import emission.storage.timeseries.builtin_timeseries as bits
import emission.core.wrapper.entry as ecwe # Added missing import
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved

class NonUserTimeSeries(bits.BuiltinTimeSeries):
def __init__(self):
super(AggregateTimeSeries, self).__init__(None)
super(NonUserTimeSeries, self).__init__(None) # Corrected superclass name
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
self.user_query = {}
self.timeseries_db = edb.get_non_user_timeseries_db()

Expand All @@ -17,26 +18,27 @@ def get_uuid_list():
return []

def get_timeseries_db(self, key):
return self.timeseries_db
return self.timeseries_db # Ensure this is intended to ignore 'key'
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved

# _get_query: not overridden
# _get_sort_query: not overridden
# _to_df_entry: not overridden
# df_row_to_entry: not overridden
# get_entry_from_id: not overridden

def find_entries(self, key_list = None, time_query = None, geo_query = None,
extra_query_list=None):
def find_entries(self, key_list=None, time_query=None, geo_query=None,
extra_query_list=None):
sort_key = self._get_sort_key(time_query)
logging.debug("curr_query = %s, sort_key = %s" %
(self._get_query(key_list, time_query, geo_query,
extra_query_list), sort_key))
ts_db_result = self._get_entries_for_timeseries(self.timeseries_db,
key_list,
time_query,
geo_query,
extra_query_list,
sort_key)
current_query = self._get_query(key_list, time_query, geo_query, extra_query_list)
logging.debug(f"curr_query = {current_query}, sort_key = {sort_key}")
ts_db_result = self._get_entries_for_timeseries(
self.timeseries_db,
key_list,
time_query,
geo_query,
extra_query_list,
sort_key
)
return ts_db_result

# _get_entries_for_timeseries is unchanged
Expand All @@ -51,16 +53,28 @@ def insert(self, entry):
Inserts the specified entry and returns the object ID
"""
logging.debug("insert called")
if type(entry) == dict:
if isinstance(entry, dict):
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
entry = ecwe.Entry(entry)
if entry["user_id"] is not None:
raise AttributeError("Saving entry %s for %s in non_user_timeseries" %
(entry, entry["user_id"]))
raise AttributeError(
f"Saving entry {entry} for {entry['user_id']} in non_user_timeseries is not allowed."
)
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
else:
logging.debug("entry was fine, no need to fix it")

logging.debug("Inserting entry %s into timeseries" % entry)
return self.get_timeseries_db(entry.metadata.key).insert(entry)
# Get the collection and log its full name
collection = self.get_timeseries_db(entry.metadata.key)
logging.debug(f"Collection used for insertion: {collection.full_name}")

logging.debug(f"Inserting entry {entry} into timeseries")
try:
result = collection.insert_one(entry)
logging.debug(f"Inserted entry with ID: {result.inserted_id}")
return result.inserted_id
except pymongo.errors.PyMongoError as e:
logging.error(f"Failed to insert entry: {e}")
raise
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved

TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved

# insert_data is unchanged
def insert_error(self, entry):
Expand All @@ -87,4 +101,3 @@ def update_data(user_id, key, obj_id, data):
versioned objects
"""
raise AttributeError("non_user_timeseries does not support updates")

95 changes: 95 additions & 0 deletions emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# emission/tests/funcTests/TestFunctionTiming.py

import logging
import time
from typing import Callable, List

# Import the store_dashboard_time and store_dashboard_error functions
from emission.storage.decorations.stats_queries import (
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
store_dashboard_time,
store_dashboard_error
)

# Import the existing Timer context manager
from emission.core.timer import Timer as ECT_Timer
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved


# Define test functions
def test_function_1():
logging.info("Executing test_function_1")
time.sleep(1) # Simulate processing time
return True # Indicate successful execution

def test_function_2():
logging.info("Executing test_function_2")
time.sleep(2)
return True

def test_function_faulty():
logging.info("Executing test_function_faulty")
time.sleep(1)
raise ValueError("Simulated error in test_function_faulty")

def test_function_3():
logging.info("Executing test_function_3")
time.sleep(3)
return True

def execute_and_time_function(func: Callable[[], bool]):
"""
Executes a given function, measures its execution time using ECT_Timer,
and stores the timing information using store_dashboard_time.
If the function raises an exception, it stores the error using store_dashboard_error.

Parameters:
- func (Callable[[], bool]): The test function to execute and time.
"""
function_name = func.__name__
timestamp = time.time()

logging.info(f"Starting timing for function: {function_name}")

try:
with ECT_Timer() as timer:
result = func() # Execute the test function

elapsed_seconds = timer.elapsed # Accessing the float attribute directly
elapsed_ms = elapsed_seconds * 1000 # Convert to milliseconds

# Store the execution time
store_dashboard_time(
code_fragment_name=function_name,
ts=timestamp,
reading=elapsed_ms
)
print(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.")
logging.info(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.")

TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
# Even if the function fails, capture the elapsed time up to the exception
elapsed_seconds = timer.elapsed if 'timer' in locals() else 0 # Accessing the float attribute directly
elapsed_ms = elapsed_seconds * 1000

# Store the error timing
store_dashboard_error(
code_fragment_name=function_name,
ts=timestamp,
reading=elapsed_ms
)
print(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}")
logging.error(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}")

def main():
# Define the list of test functions, including the faulty one
function_list: List[Callable[[], bool]] = [
test_function_1,
test_function_2,
# test_function_faulty, # This will raise an exception
test_function_3 # This should execute normally after the faulty function
]
# Execute and time each function
for func in function_list:
execute_and_time_function(func)

if __name__ == "__main__":
main()
Loading