Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats Timing #986

Merged
merged 42 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
acf7f3e
Typo Fix
TeachMeTW Oct 2, 2024
553a45d
Added Func Time Collection?
TeachMeTW Oct 4, 2024
d19392d
Fixed per feedback, removed decorator
TeachMeTW Oct 5, 2024
05bac58
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
e23d4b6
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
7fc3f81
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
e7e50da
Update emission/storage/timeseries/builtin_timeseries.py
TeachMeTW Oct 12, 2024
dd253dd
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
8957a06
Review Changes
TeachMeTW Oct 12, 2024
f428069
Changes
TeachMeTW Oct 13, 2024
4f636a9
Reverted some
TeachMeTW Oct 14, 2024
b2e3cb5
Added verification
TeachMeTW Oct 14, 2024
a1ff54e
resolve
TeachMeTW Oct 14, 2024
12d5e86
Fixed whitespace commits
TeachMeTW Oct 14, 2024
cf5b5a1
Removed unused imports
TeachMeTW Oct 14, 2024
2b5cefd
Removed unused imports
TeachMeTW Oct 14, 2024
5ac74fa
Delete miniconda.sh
TeachMeTW Oct 14, 2024
64cfc3f
Removed unnecessary
TeachMeTW Oct 14, 2024
a7884e5
Merge branch 'feature/stats_timing' of https://github.com/TeachMeTW/e…
TeachMeTW Oct 14, 2024
196e410
Timeseries
TeachMeTW Oct 14, 2024
fc3f0ee
Used TimeSeries Interface and made it so _get_query accepts dict and …
TeachMeTW Oct 15, 2024
d02f186
Used get entry at ts
TeachMeTW Oct 15, 2024
b6bb6b9
Refactor
TeachMeTW Oct 15, 2024
bc9f638
Turned into unit test
TeachMeTW Oct 15, 2024
f814102
Wrong comment
TeachMeTW Oct 15, 2024
5066c85
Update .gitignore
TeachMeTW Oct 15, 2024
9996707
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 15, 2024
fed4964
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 15, 2024
09f93df
remvoed whitespace
TeachMeTW Oct 15, 2024
9c418c8
Used find_entries
TeachMeTW Oct 15, 2024
5ba94d7
Jack Changes
TeachMeTW Oct 15, 2024
7f12f67
Reverts
TeachMeTW Oct 16, 2024
4c7b80b
Reverted Non_user
TeachMeTW Oct 16, 2024
e843169
Added more tests
TeachMeTW Oct 16, 2024
9f9ec9e
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 16, 2024
d76f63a
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 16, 2024
f5b07a4
Implemented Review Changes
TeachMeTW Oct 16, 2024
a24b03b
simplify tests for store_dashboard_time / store_dashboard_error
JGreenlee Oct 17, 2024
39f763d
move TestFunctionTiming -> TestStatsQueries
JGreenlee Oct 17, 2024
1bdc922
Merge pull request #1 from JGreenlee/simpler_stats_timing
TeachMeTW Oct 17, 2024
db1e7bb
Added comments and changed param
TeachMeTW Oct 17, 2024
0b39803
Toned back comments
TeachMeTW Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions conf/log/function_pipeline.conf
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"detailed": {
"class": "logging.Formatter",
"format": "%(asctime)s:%(levelname)s:%(thread)d:%(message)s"
}
},
"handlers": {
"errors": {
"class": "logging.handlers.RotatingFileHandler",
"level": "ERROR",
"formatter": "detailed",
"filename": "/var/tmp/function_pipeline-errors.log",
"mode": "a",
"maxBytes": 1073741824,
"backupCount": 2,
"encoding": "UTF-8"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"level": "DEBUG",
"formatter": "detailed",
"filename": "/var/tmp/function_pipeline.log",
"mode": "a",
"maxBytes": 1073741824,
"backupCount": 8,
"encoding": "UTF-8"
},
"console": {
"class": "logging.StreamHandler",
"level": "WARNING",
"formatter": "detailed",
"stream": "ext://sys.stdout"
}
},
"root": {
"handlers": [
"console",
"file",
"errors"
],
"level": "DEBUG"
}
}
2 changes: 2 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def _getData2Wrapper():
"stats/server_api_error": "statsevent",
# pipeline stage time, measured on the server
"stats/pipeline_time": "statsevent",
# function time, measured on the server
"stats/function_time": "statsevent",
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
# intended to log the occurrence of errors in the pipeline
"stats/pipeline_error": "statsevent",
# time for various client operations, measured on the client
Expand Down
82 changes: 82 additions & 0 deletions emission/functions/time_functions.py
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import json
import logging
import logging.config
import os
import time
import numpy as np
import arrow
import pymongo

import emission.core.get_database as edb
import emission.core.timer as ect

import emission.storage.decorations.stats_queries as esds

def run_function_pipeline(process_number, function_list, skip_if_no_new_data=False):
"""
Run the function pipeline with the specified process number and function list.
Note that the process_number is only really used to customize the log file name
We could avoid passing it in by using the process id - os.getpid() instead, but
then we won't get the nice RotatingLogHandler properties such as auto-deleting
files if there are too many. Maybe it will work properly with logrotate? Need to check

:param process_number: id representing the process number. In range (0..n)
:param function_list: the list of functions that this process will handle
:param skip_if_no_new_data: flag to skip function execution based on custom logic
:return: None
"""
try:
with open("conf/log/function_pipeline.conf", "r") as cf:
pipeline_log_config = json.load(cf)
except FileNotFoundError:
with open("conf/log/function_pipeline.conf.sample", "r") as cf:
pipeline_log_config = json.load(cf)

# Customize log filenames with process number
pipeline_log_config["handlers"]["file"]["filename"] = \
pipeline_log_config["handlers"]["file"]["filename"].replace("function_pipeline", f"function_pipeline_{process_number}")
pipeline_log_config["handlers"]["errors"]["filename"] = \
pipeline_log_config["handlers"]["errors"]["filename"].replace("function_pipeline", f"function_pipeline_{process_number}")

logging.config.dictConfig(pipeline_log_config)
np.random.seed(61297777)

logging.info(f"Processing function list: { [func.__name__ for func in function_list] }")

for func in function_list:
func_name = func.__name__
if func is None:
continue

try:
run_function_pipeline_step(func, skip_if_no_new_data)
except Exception as e:
esds.store_function_error(func_name, "WHOLE_PIPELINE", time.time(), None)
logging.exception(f"Found error {e} while processing pipeline for function {func_name}, skipping")

def run_function_pipeline_step(func, skip_if_no_new_data):
"""
Execute a single step in the function pipeline.

:param func: The function to execute
:param skip_if_no_new_data: Flag to determine if the function should be skipped based on custom logic
:return: None
"""
func_name = func.__name__

with ect.Timer() as timer:
logging.info(f"********** Function {func_name}: Starting execution **********")
print(f"{arrow.now()} ********** Function {func_name}: Starting execution **********")
result = func()

# Store the execution time
esds.store_function_time(func_name, "EXECUTION",
time.time(), timer.elapsed)

if skip_if_no_new_data and not result:
print(f"No new data for {func_name}, and skip_if_no_new_data = {skip_if_no_new_data}, skipping the rest of the pipeline")
return
else:
print(f"Function {func_name} executed with result = {result} and skip_if_no_new_data = {skip_if_no_new_data}, continuing")

logging.info(f"********** Function {func_name}: Completed execution **********")
2 changes: 1 addition & 1 deletion emission/pipeline/intake_stage.py
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
eaum.match_incoming_user_inputs(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.name,
time.time(), uct.elapsed)
time.time(), uit.elapsed)

# Hack until we delete these spurious entries
# https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868
Expand Down
21 changes: 21 additions & 0 deletions emission/storage/decorations/stats_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from builtins import *
import logging
import time
from functools import wraps
from typing import Callable, Any

# Our imports
import emission.storage.timeseries.abstract_timeseries as esta
Expand Down Expand Up @@ -46,3 +48,22 @@ def store_stats_entry(user_id, metadata_key, name, ts, reading):
new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data)
return esta.TimeSeries.get_time_series(user_id).insert(new_entry)
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved

def store_function_time(user_id: str, stage_string: str, ts: float, reading: float):
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
"""
Stores the execution time of a function.

Parameters:
- user_id (str): The ID of the user.
- stage_string (str): The name of the function being timed.
- ts (float): The timestamp when the function execution started.
- reading (float): The duration of the function execution in milliseconds.

TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Returns:
- InsertResult: The result of the insert operation.
"""
store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading)
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved


def store_function_error(user_id, stage_string, ts, reading):
store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading)

1 change: 1 addition & 0 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, user_id):
"stats/server_api_time": self.timeseries_db,
"stats/server_api_error": self.timeseries_db,
"stats/pipeline_time": self.timeseries_db,
"stats/function_time": self.timeseries_db,
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
"stats/pipeline_error": self.timeseries_db,
"stats/client_time": self.timeseries_db,
"stats/client_nav_event": self.timeseries_db,
Expand Down
49 changes: 49 additions & 0 deletions emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# emission/tests/funcTests/TestFunctionTiming.py

import json
import logging
import logging.config
import os
import time
import numpy as np
import arrow
from contextlib import contextmanager

# Import the run_function_pipeline function from time_functions.py
from emission.functions.time_functions import run_function_pipeline

# Define test functions
def test_function_1():
logging.info("Executing test_function_1")
time.sleep(1) # Simulate processing time
return True # Indicate successful execution

def test_function_2():
logging.info("Executing test_function_2")
time.sleep(1)
return True

def test_function_faulty():
logging.info("Executing test_function_faulty")
time.sleep(1)
raise ValueError("Simulated error in test_function_faulty")

def test_function_3():
logging.info("Executing test_function_3")
time.sleep(1)
return True

if __name__ == "__main__":
# Ensure the logs directory exists
os.makedirs("logs", exist_ok=True)

# Define the list of test functions, including the faulty one
function_list = [
test_function_1,
test_function_2,
test_function_faulty, # This will raise an exception
test_function_3 # This should execute normally after the faulty function
]

# Run the pipeline with process number 1 and skip_if_no_new_data set to True
run_function_pipeline(process_number=1, function_list=function_list, skip_if_no_new_data=True)
Loading