From 40b63f3d6bcaa42521338b873cd3c13a2fa5f094 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Dec 2024 13:53:16 +0100 Subject: [PATCH 01/18] feat: Create default logger, gitignore logs --- .gitignore | 2 + policyengine_api/api.py | 5 +- policyengine_api/utils/__init__.py | 1 + policyengine_api/utils/logger.py | 395 +++++++++++++++++++++++++++++ 4 files changed, 402 insertions(+), 1 deletion(-) create mode 100644 policyengine_api/utils/logger.py diff --git a/.gitignore b/.gitignore index 91af8c3c..c3a5c3d6 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,8 @@ dist/* **/*.h5 **/*.csv.gz .env +logs/* +*.log # Ignore generated credentials from google-github-actions/auth gha-creds-*.json diff --git a/policyengine_api/api.py b/policyengine_api/api.py index 3f3bd725..d4d49833 100644 --- a/policyengine_api/api.py +++ b/policyengine_api/api.py @@ -7,7 +7,7 @@ import flask import yaml from flask_caching import Cache -from policyengine_api.utils import make_cache_key +from policyengine_api.utils import make_cache_key, Logger from .constants import VERSION # from werkzeug.middleware.profiler import ProfilerMiddleware @@ -40,6 +40,9 @@ print("Initialising API...") +# Set up logging +logger = Logger() + app = application = flask.Flask(__name__) app.config.from_mapping( diff --git a/policyengine_api/utils/__init__.py b/policyengine_api/utils/__init__.py index 07a267c6..fb39956e 100644 --- a/policyengine_api/utils/__init__.py +++ b/policyengine_api/utils/__init__.py @@ -2,3 +2,4 @@ from .cache_utils import * from .singleton import Singleton from .get_current_law import get_current_law_policy_id +from .logger import Logger diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py new file mode 100644 index 00000000..c440a8fb --- /dev/null +++ b/policyengine_api/utils/logger.py @@ -0,0 +1,395 @@ +import logging +import sys +from rq import Worker, get_current_job +from google.cloud import logging as cloud_logging +from datetime import datetime +import time +import threading +import psutil +from typing import Optional +from pathlib import Path +import os +from weakref import proxy +import signal +import uuid + + +class Logger: + def __init__( + self, + logger_root_dir="logs", + logger_name="default", + log_to_cloud=True, + ): + """ + Initialize standard logger + + Three-part filepath: + - log_dir (defaults to "logs") + - logger_name (defaults to "default") + - logger_id (unique identifier for this logging session) + + Args: + log_to_cloud (bool): Whether to log to Google Cloud Logging + log_root_dir (str): Directory to store local log files (defaults to "logs") + """ + # Check if running in debug; if so, don't initialize before Werkzeug, + # otherwise we'll generate two log files, one which will be empty + if ( + os.environ.get("FLASK_DEBUG") == "1" + and os.environ.get("WERKZEUG_RUN_MAIN") != "true" + ): + print("Skipping logger initialization in debug mode pre-Werkzeug") + return + + self.logger_root_dir = logger_root_dir + self.logger_name = logger_name + self.logger = logging.getLogger(logger_name) + self.logger.setLevel(logging.INFO) + + # Generate a unique identifier for this logging session + self.logger_id = uuid.uuid4().hex + + # Create log directory if it doesn't exist + self.logger_full_dir = Path(self.logger_root_dir).joinpath(logger_name) + try: + self.logger_full_dir.mkdir(parents=True, exist_ok=True) + except Exception as e: + print( + f"Warning: Could not create log directory {self.logger_full_dir}: {str(e)}" + ) + # Fall back to current directory + self.log_dir = Path(".") + + self.memory_monitor = None + self.cloud_client = None + self.cloud_logger = None + print(f"Initialized logger") + + # Prevent duplicate handlers + if not self.logger.handlers: + # File handler - logs to local file + self.log_file = self.logger_full_dir / f"{self.logger_id}.log" + file_handler = logging.FileHandler(str(self.log_file)) + print( + f"Logging to file: logs/{self.logger_name}/{self.logger_id}.log" + ) + file_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ) + self.logger.addHandler(file_handler) + + # Console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ) + self.logger.addHandler(console_handler) + + # Google Cloud Logging handler + if log_to_cloud: + cloud_client = cloud_logging.Client() + cloud_handler = cloud_logging.handlers.CloudLoggingHandler( + cloud_client, name=f"{self.logger_name}/{self.logger_id}" + ) + cloud_handler.setFormatter( + logging.Formatter("%(levelname)s - %(message)s") + ) + self.logger.addHandler(cloud_handler) + + def log(self, message, level="info", **context): + """ + Log a message with optional context data + """ + # Format message with context if provided + if context: + context_str = " ".join(f"{k}={v}" for k, v in context.items()) + message = f"{message} | {context_str}" + + log_func = getattr(self.logger, level.lower()) + log_func(message) + + +# class WorkerLogger: +# @staticmethod +# def get_worker_id(): +# """ +# Attempts to get the worker ID through various methods: +# 1. From current RQ job +# 2. From environment variable +# 3. From RQ worker name +# 4. Generates a default if none found +# """ +# # Try to get from current job context +# current_job = get_current_job() +# if current_job and current_job.worker_name: +# return current_job.worker_name +# +# # Try to get from current worker +# try: +# worker = Worker.find_by_key( +# Worker.worker_key_prefix + current_job.worker_name +# ) +# if worker: +# return worker.name +# except: +# pass +# +# # Default to timestamp if no other ID found +# return datetime.now().strftime("%Y%m%d_%H%M%S") +# +# def __init__( +# self, +# worker_id=None, +# job_id=None, +# log_to_cloud=True, +# log_dir="logs", +# monitor_memory=True, +# memory_threshold=75, +# memory_check_interval=5, +# ): +# """ +# Initialize logger with automatic worker ID detection if none provided +# +# Args: +# worker_id (str): Optional worker ID +# log_to_cloud (bool): Whether to log to Google Cloud Logging +# log_dir (str): Directory to store local log files (defaults to "logs") +# monitor_memory (bool): Whether to monitor memory usage +# memory_threshold (int): Memory usage threshold to trigger warnings (default: 90%) +# memory_check_interval (int): How often to check memory in seconds (default: 5) +# """ +# self.worker_id = worker_id or self.get_worker_id() +# self.logger = logging.getLogger(f"worker_{self.worker_id}") +# self.logger.setLevel(logging.INFO) +# +# self.log_dir = Path(log_dir) +# +# # Create log directory if it doesn't exist +# self.log_dir = Path(log_dir) +# try: +# self.log_dir.mkdir(parents=True, exist_ok=True) +# except Exception as e: +# print( +# f"Warning: Could not create log directory {log_dir}: {str(e)}" +# ) +# # Fall back to current directory +# self.log_dir = Path(".") +# +# self.memory_monitor = None +# if monitor_memory: +# self.memory_monitor = MemoryMonitor( +# logger=self, +# threshold_percent=memory_threshold, +# check_interval=memory_check_interval, +# ) +# +# self.cloud_client = None +# self.cloud_logger = None +# print(f"Initialized worker logger with ID: {self.worker_id}") +# +# # Prevent duplicate handlers +# if not self.logger.handlers: +# # File handler - logs to local file +# log_file = self.log_dir / f"worker_{self.worker_id}.log" +# file_handler = logging.FileHandler(str(log_file)) +# print(f"Logging to file: logs/worker_{self.worker_id}.log") +# file_handler.setFormatter( +# logging.Formatter( +# "%(asctime)s - %(name)s - %(levelname)s - %(message)s" +# ) +# ) +# self.logger.addHandler(file_handler) +# +# # Console handler +# console_handler = logging.StreamHandler(sys.stdout) +# console_handler.setFormatter( +# logging.Formatter( +# "%(asctime)s - %(name)s - %(levelname)s - %(message)s" +# ) +# ) +# self.logger.addHandler(console_handler) +# +# # Google Cloud Logging handler +# if log_to_cloud: +# cloud_client = cloud_logging.Client() +# cloud_handler = cloud_logging.handlers.CloudLoggingHandler( +# cloud_client, name=f"worker_{self.worker_id}" +# ) +# cloud_handler.setFormatter( +# logging.Formatter("%(levelname)s - %(message)s") +# ) +# self.logger.addHandler(cloud_handler) +# +# def log(self, message, level="info", **context): +# """ +# Log a message with optional context data +# """ +# # Add job ID to context if available +# current_job = get_current_job() +# if current_job: +# context["job_id"] = current_job.id +# +# # Format message with context if provided +# if context: +# context_str = " ".join(f"{k}={v}" for k, v in context.items()) +# message = f"{message} | {context_str}" +# +# log_func = getattr(self.logger, level.lower()) +# log_func(message) +# +# def log_memory_stats( +# self, process_memory_mb, process_percent, system_percent +# ): +# """Log memory statistics""" +# self.log( +# "Memory usage stats", +# level="info", +# metric_type="memory_usage", +# process_memory_mb=round(process_memory_mb, 2), +# process_percent=round(process_percent, 2), +# system_percent=round(system_percent, 2), +# ) +# +# def log_memory_warning(self, message, **context): +# """Log memory warning""" +# self.log( +# message, level="warning", metric_type="memory_warning", **context +# ) +# +# def __enter__(self): +# """Context manager entry""" +# return self +# +# def __exit__(self, exc_type, exc_val, exc_tb): +# """Context manager exit - ensure cleanup""" +# if self.memory_monitor: +# self.memory_monitor.stop() +# +# +# class MemoryMonitor: +# def __init__(self, threshold_percent=90, check_interval=5, logger=None): +# """ +# Initialize memory monitor +# +# Args: +# threshold_percent (int): Memory usage threshold to trigger warnings (default: 75%) +# check_interval (int): How often to check memory in seconds (default: 5) +# """ +# self.threshold_percent = threshold_percent +# self.check_interval = check_interval +# self.stop_flag = threading.Event() +# self.monitor_thread: Optional[threading.Thread] = None +# self.logger = proxy(logger) +# self._pid = os.getpid() +# +# def start(self): +# """Start memory monitoring in a separate thread""" +# self.stop_flag.clear() +# self._pid = os.getpid() +# +# self.monitor_thread = threading.Thread(target=self._monitor_memory) +# self.monitor_thread.daemon = True +# self.monitor_thread.start() +# +# self._setup_signal_handlers() +# +# def stop(self): +# """Stop memory monitoring""" +# if self.monitor_thread and self.monitor_thread.is_alive(): +# self.stop_flag.set() +# self.monitor_thread.join(timeout=1.0) +# +# def _setup_signal_handlers(self): +# """Setup signal handlers to stop monitoring""" +# +# for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT): +# signal.signal(sig, self._handle_signal) +# +# def _handle_signal(self, signum, frame): +# """Signal handler to stop monitoring""" +# self.logger.log( +# f"Received signal {signum}, stopping memory monitor", +# level="critical", +# ) +# self.stop() +# +# def _monitor_memory(self): +# """Memory monitoring loop""" +# process = psutil.Process() +# while not self.stop_flag.is_set(): +# try: +# +# if os.getpid() != self._pid: +# self.logger.log( +# "Memory monitor detected PID mismatch, stopping", +# level="warning", +# ) +# break +# +# try: +# process = psutil.Process(self._pid) +# except psutil.NoSuchProcess: +# self.logger.log( +# "Memory monitor detected missing process, stopping", +# level="warning", +# ) +# break +# +# if not process.is_running(): +# self.logger.log( +# "Memory monitor detected process stopped, stopping", +# level="warning", +# ) +# break +# +# try: +# # Get memory info +# memory_info = process.memory_info() +# system_memory = psutil.virtual_memory() +# except Exception as e: +# self.logger.log( +# f"Error getting memory info: {str(e)}", +# level="error", +# error_type=type(e).__name__, +# ) +# break +# +# # Calculate usage percentages +# process_percent = (memory_info.rss / system_memory.total) * 100 +# system_percent = system_memory.percent +# +# # Log memory stats +# self.logger.log_memory_stats( +# process_memory_mb=memory_info.rss / (1024 * 1024), +# process_percent=process_percent, +# system_percent=system_percent, +# ) +# +# # Check for high memory usage +# if system_percent > self.threshold_percent: +# self.logger.log_memory_warning( +# f"High system memory usage: {system_percent:.1f}%", +# system_percent=system_percent, +# ) +# +# if process_percent > ( +# self.threshold_percent / 2 +# ): # Process threshold at half of system +# self.logger.log_memory_warning( +# f"High process memory usage: {process_percent:.1f}%", +# process_percent=process_percent, +# ) +# +# except Exception as e: +# self.logger.log( +# f"Error monitoring memory: {str(e)}", +# level="error", +# error_type=type(e).__name__, +# ) +# +# time.sleep(self.check_interval) From 2469c1cedb3fa1e7250d796ac8dea9abf7d3f40a Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Dec 2024 15:33:52 +0100 Subject: [PATCH 02/18] fix: Fix logging to GCP --- policyengine_api/api.py | 6 +- policyengine_api/utils/logger.py | 112 +++++++++++++++++-------------- 2 files changed, 64 insertions(+), 54 deletions(-) diff --git a/policyengine_api/api.py b/policyengine_api/api.py index d4d49833..6759975c 100644 --- a/policyengine_api/api.py +++ b/policyengine_api/api.py @@ -38,11 +38,11 @@ get_simulations, ) -print("Initialising API...") - # Set up logging logger = Logger() +logger.log("Initialising API...") + app = application = flask.Flask(__name__) app.config.from_mapping( @@ -150,4 +150,4 @@ def get_specification(): return flask.jsonify(openapi_spec) -print("API initialised.") +logger.log("API initialised.") diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index c440a8fb..20347e69 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -35,10 +35,7 @@ def __init__( """ # Check if running in debug; if so, don't initialize before Werkzeug, # otherwise we'll generate two log files, one which will be empty - if ( - os.environ.get("FLASK_DEBUG") == "1" - and os.environ.get("WERKZEUG_RUN_MAIN") != "true" - ): + if os.environ.get("FLASK_DEBUG") == "1" and os.environ.get("WERKZEUG_RUN_MAIN") != "true": print("Skipping logger initialization in debug mode pre-Werkzeug") return @@ -71,9 +68,8 @@ def __init__( # File handler - logs to local file self.log_file = self.logger_full_dir / f"{self.logger_id}.log" file_handler = logging.FileHandler(str(self.log_file)) - print( - f"Logging to file: logs/{self.logger_name}/{self.logger_id}.log" - ) + file_handler.setLevel(logging.INFO) + print(f"Logging to file: logs/{self.logger_name}/{self.logger_id}.log") file_handler.setFormatter( logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" @@ -83,6 +79,7 @@ def __init__( # Console handler console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(logging.INFO) console_handler.setFormatter( logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" @@ -90,12 +87,18 @@ def __init__( ) self.logger.addHandler(console_handler) - # Google Cloud Logging handler - if log_to_cloud: - cloud_client = cloud_logging.Client() + # Google Cloud Logging handler; don't log to GCP if in debug + if log_to_cloud and os.environ.get("FLASK_DEBUG") != "1": + try: + cloud_client = google.cloud.logging.Client() + except Exception as e: + print(f"Google Cloud Logging error: {str(e)}") + return cloud_handler = cloud_logging.handlers.CloudLoggingHandler( - cloud_client, name=f"{self.logger_name}/{self.logger_id}" + cloud_client, name=f"{self.logger_name}" ) + cloud_handler = cloud_logging.handlers.CloudLoggingHandler(cloud_client) + cloud_handler.setLevel(logging.INFO) cloud_handler.setFormatter( logging.Formatter("%(levelname)s - %(message)s") ) @@ -105,6 +108,13 @@ def log(self, message, level="info", **context): """ Log a message with optional context data """ + + # Don't log if running in debug and Werkzeug not initialized; + # this will prevent duplicate log files + if getattr(self, "logger", None) is None: + print("Logger not initialized; skipping log") + return + # Format message with context if provided if context: context_str = " ".join(f"{k}={v}" for k, v in context.items()) @@ -128,7 +138,7 @@ def log(self, message, level="info", **context): # current_job = get_current_job() # if current_job and current_job.worker_name: # return current_job.worker_name -# +# # # Try to get from current worker # try: # worker = Worker.find_by_key( @@ -138,10 +148,10 @@ def log(self, message, level="info", **context): # return worker.name # except: # pass -# +# # # Default to timestamp if no other ID found # return datetime.now().strftime("%Y%m%d_%H%M%S") -# +# # def __init__( # self, # worker_id=None, @@ -154,7 +164,7 @@ def log(self, message, level="info", **context): # ): # """ # Initialize logger with automatic worker ID detection if none provided -# +# # Args: # worker_id (str): Optional worker ID # log_to_cloud (bool): Whether to log to Google Cloud Logging @@ -166,9 +176,9 @@ def log(self, message, level="info", **context): # self.worker_id = worker_id or self.get_worker_id() # self.logger = logging.getLogger(f"worker_{self.worker_id}") # self.logger.setLevel(logging.INFO) -# +# # self.log_dir = Path(log_dir) -# +# # # Create log directory if it doesn't exist # self.log_dir = Path(log_dir) # try: @@ -179,7 +189,7 @@ def log(self, message, level="info", **context): # ) # # Fall back to current directory # self.log_dir = Path(".") -# +# # self.memory_monitor = None # if monitor_memory: # self.memory_monitor = MemoryMonitor( @@ -187,11 +197,11 @@ def log(self, message, level="info", **context): # threshold_percent=memory_threshold, # check_interval=memory_check_interval, # ) -# +# # self.cloud_client = None # self.cloud_logger = None # print(f"Initialized worker logger with ID: {self.worker_id}") -# +# # # Prevent duplicate handlers # if not self.logger.handlers: # # File handler - logs to local file @@ -204,7 +214,7 @@ def log(self, message, level="info", **context): # ) # ) # self.logger.addHandler(file_handler) -# +# # # Console handler # console_handler = logging.StreamHandler(sys.stdout) # console_handler.setFormatter( @@ -213,7 +223,7 @@ def log(self, message, level="info", **context): # ) # ) # self.logger.addHandler(console_handler) -# +# # # Google Cloud Logging handler # if log_to_cloud: # cloud_client = cloud_logging.Client() @@ -224,7 +234,7 @@ def log(self, message, level="info", **context): # logging.Formatter("%(levelname)s - %(message)s") # ) # self.logger.addHandler(cloud_handler) -# +# # def log(self, message, level="info", **context): # """ # Log a message with optional context data @@ -233,15 +243,15 @@ def log(self, message, level="info", **context): # current_job = get_current_job() # if current_job: # context["job_id"] = current_job.id -# +# # # Format message with context if provided # if context: # context_str = " ".join(f"{k}={v}" for k, v in context.items()) # message = f"{message} | {context_str}" -# +# # log_func = getattr(self.logger, level.lower()) # log_func(message) -# +# # def log_memory_stats( # self, process_memory_mb, process_percent, system_percent # ): @@ -254,28 +264,28 @@ def log(self, message, level="info", **context): # process_percent=round(process_percent, 2), # system_percent=round(system_percent, 2), # ) -# +# # def log_memory_warning(self, message, **context): # """Log memory warning""" # self.log( # message, level="warning", metric_type="memory_warning", **context # ) -# +# # def __enter__(self): # """Context manager entry""" # return self -# +# # def __exit__(self, exc_type, exc_val, exc_tb): # """Context manager exit - ensure cleanup""" # if self.memory_monitor: # self.memory_monitor.stop() -# -# +# +# # class MemoryMonitor: # def __init__(self, threshold_percent=90, check_interval=5, logger=None): # """ # Initialize memory monitor -# +# # Args: # threshold_percent (int): Memory usage threshold to trigger warnings (default: 75%) # check_interval (int): How often to check memory in seconds (default: 5) @@ -286,30 +296,30 @@ def log(self, message, level="info", **context): # self.monitor_thread: Optional[threading.Thread] = None # self.logger = proxy(logger) # self._pid = os.getpid() -# +# # def start(self): # """Start memory monitoring in a separate thread""" # self.stop_flag.clear() # self._pid = os.getpid() -# +# # self.monitor_thread = threading.Thread(target=self._monitor_memory) # self.monitor_thread.daemon = True # self.monitor_thread.start() -# +# # self._setup_signal_handlers() -# +# # def stop(self): # """Stop memory monitoring""" # if self.monitor_thread and self.monitor_thread.is_alive(): # self.stop_flag.set() # self.monitor_thread.join(timeout=1.0) -# +# # def _setup_signal_handlers(self): # """Setup signal handlers to stop monitoring""" -# +# # for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT): # signal.signal(sig, self._handle_signal) -# +# # def _handle_signal(self, signum, frame): # """Signal handler to stop monitoring""" # self.logger.log( @@ -317,20 +327,20 @@ def log(self, message, level="info", **context): # level="critical", # ) # self.stop() -# +# # def _monitor_memory(self): # """Memory monitoring loop""" # process = psutil.Process() # while not self.stop_flag.is_set(): # try: -# +# # if os.getpid() != self._pid: # self.logger.log( # "Memory monitor detected PID mismatch, stopping", # level="warning", # ) # break -# +# # try: # process = psutil.Process(self._pid) # except psutil.NoSuchProcess: @@ -339,14 +349,14 @@ def log(self, message, level="info", **context): # level="warning", # ) # break -# +# # if not process.is_running(): # self.logger.log( # "Memory monitor detected process stopped, stopping", # level="warning", # ) # break -# +# # try: # # Get memory info # memory_info = process.memory_info() @@ -358,25 +368,25 @@ def log(self, message, level="info", **context): # error_type=type(e).__name__, # ) # break -# +# # # Calculate usage percentages # process_percent = (memory_info.rss / system_memory.total) * 100 # system_percent = system_memory.percent -# +# # # Log memory stats # self.logger.log_memory_stats( # process_memory_mb=memory_info.rss / (1024 * 1024), # process_percent=process_percent, # system_percent=system_percent, # ) -# +# # # Check for high memory usage # if system_percent > self.threshold_percent: # self.logger.log_memory_warning( # f"High system memory usage: {system_percent:.1f}%", # system_percent=system_percent, # ) -# +# # if process_percent > ( # self.threshold_percent / 2 # ): # Process threshold at half of system @@ -384,12 +394,12 @@ def log(self, message, level="info", **context): # f"High process memory usage: {process_percent:.1f}%", # process_percent=process_percent, # ) -# +# # except Exception as e: # self.logger.log( # f"Error monitoring memory: {str(e)}", # level="error", # error_type=type(e).__name__, # ) -# -# time.sleep(self.check_interval) +# +# time.sleep(self.check_interval) \ No newline at end of file From e8a4196ee797005a2299c6ef52510502ecf72f42 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Dec 2024 15:36:14 +0100 Subject: [PATCH 03/18] fix: Switch UUID for timestamp --- policyengine_api/utils/logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 20347e69..8e0fe72d 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -44,8 +44,8 @@ def __init__( self.logger = logging.getLogger(logger_name) self.logger.setLevel(logging.INFO) - # Generate a unique identifier for this logging session - self.logger_id = uuid.uuid4().hex + # Generate a unique ID based on time + self.logger_id = datetime.now().strftime("%Y%m%d_%H%M%S") # Create log directory if it doesn't exist self.logger_full_dir = Path(self.logger_root_dir).joinpath(logger_name) From 6ce6a9ade459530abe35942e44101f5307683ceb Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Dec 2024 15:55:43 +0100 Subject: [PATCH 04/18] feat: Add logging to all routes --- policyengine_api/routes/economy_routes.py | 5 +++++ policyengine_api/routes/simulation_analysis_routes.py | 7 ++++++- policyengine_api/routes/tracer_analysis_routes.py | 6 ++++++ policyengine_api/utils/logger.py | 3 +-- 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/policyengine_api/routes/economy_routes.py b/policyengine_api/routes/economy_routes.py index a1e922b4..909f05b2 100644 --- a/policyengine_api/routes/economy_routes.py +++ b/policyengine_api/routes/economy_routes.py @@ -5,14 +5,17 @@ from policyengine_api.constants import COUNTRY_PACKAGE_VERSIONS from flask import request, Response import json +from policyengine_api.utils import Logger economy_bp = Blueprint("economy", __name__) economy_service = EconomyService() +logger = Logger() @validate_country @economy_bp.route("//over/", methods=["GET"]) def get_economic_impact(country_id, policy_id, baseline_policy_id): + logger.log(f"GET request received for get_economic_impact, in {country_id}, policy {policy_id} over {baseline_policy_id}") policy_id = int(policy_id or get_current_law_policy_id(country_id)) baseline_policy_id = int( @@ -43,6 +46,8 @@ def get_economic_impact(country_id, policy_id, baseline_policy_id): ) return result except Exception as e: + logger.log(f"Error within get_economic_impact, country {country_id}, policy {policy_id} over {baseline_policy_id}") + logger.log(str(e)) return Response( { "status": "error", diff --git a/policyengine_api/routes/simulation_analysis_routes.py b/policyengine_api/routes/simulation_analysis_routes.py index 5326989a..e4c54ff6 100644 --- a/policyengine_api/routes/simulation_analysis_routes.py +++ b/policyengine_api/routes/simulation_analysis_routes.py @@ -8,15 +8,18 @@ validate_sim_analysis_payload, validate_country, ) +from policyengine_api.utils.logger import Logger simulation_analysis_bp = Blueprint("simulation_analysis", __name__) simulation_analysis_service = SimulationAnalysisService() +logger = Logger() + @simulation_analysis_bp.route("", methods=["POST"]) @validate_country def execute_simulation_analysis(country_id): - print("Got POST request for simulation analysis") + logger.log("Got POST request for simulation analysis") # Pop items from request payload and validate # where necessary @@ -68,6 +71,8 @@ def execute_simulation_analysis(country_id): return response except Exception as e: + logger.log("Error while executing simulation analysis") + logger.log(str(e)) return Response( json.dumps( { diff --git a/policyengine_api/routes/tracer_analysis_routes.py b/policyengine_api/routes/tracer_analysis_routes.py index d15b0dd4..a35c68ff 100644 --- a/policyengine_api/routes/tracer_analysis_routes.py +++ b/policyengine_api/routes/tracer_analysis_routes.py @@ -3,6 +3,7 @@ validate_country, validate_tracer_analysis_payload, ) +from policyengine_api.utils.logger import Logger from policyengine_api.services.tracer_analysis_service import ( TracerAnalysisService, ) @@ -11,10 +12,12 @@ tracer_analysis_bp = Blueprint("tracer_analysis", __name__) tracer_analysis_service = TracerAnalysisService() +logger = Logger() @tracer_analysis_bp.route("", methods=["POST"]) @validate_country def execute_tracer_analysis(country_id): + logger.log("Got POST request for tracer analysis") payload = request.json @@ -51,6 +54,7 @@ def execute_tracer_analysis(country_id): """ This exception is raised when the tracer can't find a household tracer record """ + logger.log(f"No household simulation tracer found in {country_id}, household {household_id}, policy {policy_id}, variable {variable}") return Response( json.dumps( { @@ -62,6 +66,8 @@ def execute_tracer_analysis(country_id): 404, ) except Exception as e: + logger.log(f"Error while executing tracer analysis in {country_id}, household {household_id}, policy {policy_id}, variable {variable}") + logger.log(str(e)) return Response( json.dumps( { diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 8e0fe72d..2d6ec9d7 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -11,7 +11,6 @@ import os from weakref import proxy import signal -import uuid class Logger: @@ -90,7 +89,7 @@ def __init__( # Google Cloud Logging handler; don't log to GCP if in debug if log_to_cloud and os.environ.get("FLASK_DEBUG") != "1": try: - cloud_client = google.cloud.logging.Client() + cloud_client = cloud_logging.Client() except Exception as e: print(f"Google Cloud Logging error: {str(e)}") return From 7747bc74af36e5b68b3af155a5a171bf56b2b210 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Dec 2024 17:11:21 +0100 Subject: [PATCH 05/18] feat: Add logging to services; create Logger.error() method --- policyengine_api/routes/economy_routes.py | 10 +- .../routes/simulation_analysis_routes.py | 5 +- .../routes/tracer_analysis_routes.py | 10 +- .../services/ai_analysis_service.py | 8 ++ policyengine_api/services/economy_service.py | 26 +++-- policyengine_api/services/policy_service.py | 9 +- .../services/reform_impacts_service.py | 24 ++++- .../services/simulation_analysis_service.py | 12 ++- .../services/tracer_analysis_service.py | 17 +++- policyengine_api/utils/logger.py | 97 +++++++++++-------- 10 files changed, 145 insertions(+), 73 deletions(-) diff --git a/policyengine_api/routes/economy_routes.py b/policyengine_api/routes/economy_routes.py index 909f05b2..ea59cc5f 100644 --- a/policyengine_api/routes/economy_routes.py +++ b/policyengine_api/routes/economy_routes.py @@ -12,10 +12,13 @@ logger = Logger() + @validate_country @economy_bp.route("//over/", methods=["GET"]) def get_economic_impact(country_id, policy_id, baseline_policy_id): - logger.log(f"GET request received for get_economic_impact, in {country_id}, policy {policy_id} over {baseline_policy_id}") + logger.log( + f"GET request received for get_economic_impact, in {country_id}, policy {policy_id} over {baseline_policy_id}" + ) policy_id = int(policy_id or get_current_law_policy_id(country_id)) baseline_policy_id = int( @@ -46,8 +49,9 @@ def get_economic_impact(country_id, policy_id, baseline_policy_id): ) return result except Exception as e: - logger.log(f"Error within get_economic_impact, country {country_id}, policy {policy_id} over {baseline_policy_id}") - logger.log(str(e)) + logger.error( + f"Error within get_economic_impact, country {country_id}, policy {policy_id} over {baseline_policy_id}; details: {str(e)}" + ) return Response( { "status": "error", diff --git a/policyengine_api/routes/simulation_analysis_routes.py b/policyengine_api/routes/simulation_analysis_routes.py index e4c54ff6..38d006a9 100644 --- a/policyengine_api/routes/simulation_analysis_routes.py +++ b/policyengine_api/routes/simulation_analysis_routes.py @@ -71,8 +71,9 @@ def execute_simulation_analysis(country_id): return response except Exception as e: - logger.log("Error while executing simulation analysis") - logger.log(str(e)) + logger.error( + f"Error while executing simulation analysis; details: {str(e)}" + ) return Response( json.dumps( { diff --git a/policyengine_api/routes/tracer_analysis_routes.py b/policyengine_api/routes/tracer_analysis_routes.py index a35c68ff..758ed6eb 100644 --- a/policyengine_api/routes/tracer_analysis_routes.py +++ b/policyengine_api/routes/tracer_analysis_routes.py @@ -14,6 +14,7 @@ logger = Logger() + @tracer_analysis_bp.route("", methods=["POST"]) @validate_country def execute_tracer_analysis(country_id): @@ -54,7 +55,9 @@ def execute_tracer_analysis(country_id): """ This exception is raised when the tracer can't find a household tracer record """ - logger.log(f"No household simulation tracer found in {country_id}, household {household_id}, policy {policy_id}, variable {variable}") + logger.log( + f"No household simulation tracer found in {country_id}, household {household_id}, policy {policy_id}, variable {variable}" + ) return Response( json.dumps( { @@ -66,8 +69,9 @@ def execute_tracer_analysis(country_id): 404, ) except Exception as e: - logger.log(f"Error while executing tracer analysis in {country_id}, household {household_id}, policy {policy_id}, variable {variable}") - logger.log(str(e)) + logger.error( + f"Error while executing tracer analysis in {country_id}, household {household_id}, policy {policy_id}, variable {variable}; details: {str(e)}" + ) return Response( json.dumps( { diff --git a/policyengine_api/services/ai_analysis_service.py b/policyengine_api/services/ai_analysis_service.py index 9e6556a0..0a859491 100644 --- a/policyengine_api/services/ai_analysis_service.py +++ b/policyengine_api/services/ai_analysis_service.py @@ -4,6 +4,9 @@ import time from typing import Generator, Optional from policyengine_api.data import local_database +from policyengine_api.utils.logger import Logger + +logger = Logger() class AIAnalysisService: @@ -19,6 +22,7 @@ def get_existing_analysis( """ Get existing analysis from the local database """ + logger.log(f"Getting existing analysis") analysis = local_database.query( f"SELECT analysis FROM analysis WHERE prompt = ?", @@ -26,6 +30,7 @@ def get_existing_analysis( ).fetchone() if analysis is None: + logger.log(f"No existing analysis found") return None def generate(): @@ -46,6 +51,7 @@ def generate(): return generate() def trigger_ai_analysis(self, prompt: str) -> Generator[str, None, None]: + logger.log("Triggering AI analysis") # Configure a Claude client claude_client = anthropic.Anthropic( @@ -83,6 +89,8 @@ def generate(): if buffer: yield json.dumps({"stream": buffer}) + "\n" + logger.log("Updating analysis record") + # Finally, update the analysis record and return local_database.query( f"INSERT INTO analysis (prompt, analysis, status) VALUES (?, ?, ?)", diff --git a/policyengine_api/services/economy_service.py b/policyengine_api/services/economy_service.py index 66beaceb..e91abfca 100644 --- a/policyengine_api/services/economy_service.py +++ b/policyengine_api/services/economy_service.py @@ -3,6 +3,7 @@ from policyengine_api.services.reform_impacts_service import ( ReformImpactsService, ) +from policyengine_api.utils.logger import Logger from policyengine_api.data import local_database, database import json import datetime @@ -11,6 +12,8 @@ job_service = JobService() reform_impacts_service = ReformImpactsService() +logger = Logger() + class EconomyService: """ @@ -36,13 +39,15 @@ def get_economic_impact( options_hash = ( "[" + "&".join([f"{k}={v}" for k, v in options.items()]) + "]" ) - print("Checking if already calculated") + logger.log( + f"Checking if {policy_id} over {baseline_policy_id} in {country_id}, region {region}, dataset {dataset} already calculated" + ) # Create job ID job_id = f"reform_impact_{country_id}_{policy_id}_{baseline_policy_id}_{region}_{dataset}_{time_period}_{options_hash}_{api_version}" # First, check if already calculated - print("Checking previous impacts...") + logger.log("Checking previous impacts...") previous_impacts = self._get_previous_impacts( country_id, policy_id, @@ -53,14 +58,14 @@ def get_economic_impact( options_hash, api_version, ) - print(f"Found {len(previous_impacts)} previous impacts") - print( + logger.log(f"Found {len(previous_impacts)} previous impacts") + logger.log( f"Previous impacts status: {[imp.get('status') for imp in previous_impacts]}" ) if len(previous_impacts) == 0: # Add job to recent job list - print("No previous impacts found, creating new job...") + logger.log("No previous impacts found, creating new job...") job_service.add_recent_job( job_id=job_id, type="calculate_economy_simulation", @@ -82,7 +87,7 @@ def get_economic_impact( ) # Get baseline and reform policy - print("Fetching baseline and reform policies") + logger.log("Fetching baseline and reform policies") baseline_policy = policy_service.get_policy_json( country_id, baseline_policy_id ) @@ -91,7 +96,7 @@ def get_economic_impact( ) # Enqueue job - print("Enqueuing job") + logger.log("Enqueuing job") job_service.execute_job( type="calculate_economy_simulation", baseline_policy_id=baseline_policy_id, @@ -117,7 +122,7 @@ def get_economic_impact( 200, ) else: - print( + logger.log( f"Found previous impacts, first status: {previous_impacts[0]['status']}" ) ok_results = [ @@ -167,7 +172,7 @@ def get_economic_impact( ) except Exception as e: - print(f"Error getting economic impact: {str(e)}") + logger.error(f"Error getting economic impact: {str(e)}") raise e def _get_previous_impacts( @@ -213,6 +218,7 @@ def _set_impact_computing( options_hash, api_version, ): + logger.log("Setting impact computing record") try: reform_impacts_service.set_reform_impact( country_id, @@ -229,5 +235,5 @@ def _set_impact_computing( datetime.datetime.now(), ) except Exception as e: - print(f"Error inserting computing record: {str(e)}") + logger.error(f"Error inserting computing record: {str(e)}") raise e diff --git a/policyengine_api/services/policy_service.py b/policyengine_api/services/policy_service.py index 0a820f83..abdca68b 100644 --- a/policyengine_api/services/policy_service.py +++ b/policyengine_api/services/policy_service.py @@ -1,4 +1,7 @@ from policyengine_api.data import database +from policyengine_api.utils.logger import Logger + +logger = Logger() class PolicyService: @@ -9,6 +12,10 @@ class PolicyService: """ def get_policy_json(self, country_id, policy_id): + logger.log( + f"Getting policy json for country {country_id}, policy {policy_id}" + ) + try: policy_json = database.query( f"SELECT policy_json FROM policy WHERE country_id = ? AND id = ?", @@ -16,5 +23,5 @@ def get_policy_json(self, country_id, policy_id): ).fetchone()["policy_json"] return policy_json except Exception as e: - print(f"Error getting policy json: {str(e)}") + logger.error(f"Error getting policy json: {str(e)}") raise e diff --git a/policyengine_api/services/reform_impacts_service.py b/policyengine_api/services/reform_impacts_service.py index c6bb2d06..6b817c62 100644 --- a/policyengine_api/services/reform_impacts_service.py +++ b/policyengine_api/services/reform_impacts_service.py @@ -1,6 +1,9 @@ from policyengine_api.data import local_database +from policyengine_api.utils.logger import Logger import datetime +logger = Logger() + class ReformImpactsService: """ @@ -20,6 +23,9 @@ def get_all_reform_impacts( options_hash, api_version, ): + logger.log( + f"Getting all reform impacts for country {country_id}, policy {policy_id}, baseline {baseline_policy_id}, region {region}, dataset {dataset}" + ) try: query = ( "SELECT reform_impact_json, status, message, start_time FROM " @@ -41,7 +47,7 @@ def get_all_reform_impacts( ), ).fetchall() except Exception as e: - print(f"Error getting all reform impacts: {str(e)}") + logger.error(f"Error getting all reform impacts: {str(e)}") raise e def set_reform_impact( @@ -59,6 +65,9 @@ def set_reform_impact( reform_impact_json, start_time, ): + logger.log( + f"Setting reform impact record for country {country_id}, policy {policy_id}, baseline {baseline_policy_id}, region {region}, dataset {dataset}" + ) try: query = ( "INSERT INTO reform_impact (country_id, reform_policy_id, baseline_policy_id, " @@ -83,7 +92,7 @@ def set_reform_impact( ), ) except Exception as e: - print(f"Error setting reform impact: {str(e)}") + logger.error(f"Error setting reform impact: {str(e)}") raise e def delete_reform_impact( @@ -96,6 +105,10 @@ def delete_reform_impact( time_period, options_hash, ): + logger.log( + f"Deleteing reform impact for country {country_id}, policy {policy_id}, baseline {baseline_policy_id}, region {region}, dataset {dataset}" + ) + try: query = ( "DELETE FROM reform_impact WHERE country_id = ? AND " @@ -117,7 +130,7 @@ def delete_reform_impact( ), ) except Exception as e: - print(f"Error deleting reform impact: {str(e)}") + logger.error(f"Error deleting reform impact: {str(e)}") raise e def set_error_reform_impact( @@ -156,7 +169,7 @@ def set_error_reform_impact( ), ) except Exception as e: - print( + logger.error( f"Error setting error reform impact (something must be REALLY wrong): {str(e)}" ) raise e @@ -172,6 +185,7 @@ def set_complete_reform_impact( options_hash, reform_impact_json, ): + logger.log("Setting completed reform impact") try: query = ( "UPDATE reform_impact SET status = ?, message = ?, end_time = ?, " @@ -199,5 +213,5 @@ def set_complete_reform_impact( ), ) except Exception as e: - print(f"Error setting completed reform impact: {str(e)}") + logger.error(f"Error setting completed reform impact: {str(e)}") raise e diff --git a/policyengine_api/services/simulation_analysis_service.py b/policyengine_api/services/simulation_analysis_service.py index 2e5c31b2..0f1aac4e 100644 --- a/policyengine_api/services/simulation_analysis_service.py +++ b/policyengine_api/services/simulation_analysis_service.py @@ -1,6 +1,9 @@ import json from policyengine_api.services.ai_analysis_service import AIAnalysisService +from policyengine_api.utils.logger import Logger + +logger = Logger() class SimulationAnalysisService(AIAnalysisService): @@ -31,7 +34,7 @@ def execute_analysis( # Check if the region is enhanced_cps is_enhanced_cps = "enhanced_us" in region - print("Generating prompt for economy-wide simulation analysis") + logger.log(f"Generating prompt for economy-wide simulation analysis") # Create prompt based on data prompt = self._generate_simulation_analysis_prompt( @@ -51,14 +54,14 @@ def execute_analysis( # Add audience description to end prompt += self.audience_descriptions[audience] - print("Checking if AI analysis already exists for this prompt") + logger.log("Checking if AI analysis already exists for this prompt") # If a calculated record exists for this prompt, return it as a # streaming response existing_analysis = self.get_existing_analysis(prompt) if existing_analysis is not None: return existing_analysis - print( + logger.log( "Found no existing AI analysis; triggering new analysis with Claude" ) # Otherwise, pass prompt to Claude, then return streaming function @@ -66,6 +69,9 @@ def execute_analysis( analysis = self.trigger_ai_analysis(prompt) return analysis except Exception as e: + logger.error( + f"Error while triggering AI analysis; details: {str(e)}" + ) raise e def _generate_simulation_analysis_prompt( diff --git a/policyengine_api/services/tracer_analysis_service.py b/policyengine_api/services/tracer_analysis_service.py index 162245d2..82c8f85b 100644 --- a/policyengine_api/services/tracer_analysis_service.py +++ b/policyengine_api/services/tracer_analysis_service.py @@ -5,6 +5,9 @@ import re import anthropic from policyengine_api.services.ai_analysis_service import AIAnalysisService +from policyengine_api.utils.logger import Logger + +logger = Logger() class TracerAnalysisService(AIAnalysisService): @@ -19,6 +22,10 @@ def execute_analysis( variable: str, ): + logger.log( + f"Generating tracer analysis for household {household_id}, policy {policy_id}, variable {variable} in {country_id}" + ) + api_version = COUNTRY_PACKAGE_VERSIONS[country_id] # Retrieve tracer record from table @@ -30,6 +37,7 @@ def execute_analysis( api_version, ) except Exception as e: + logger.error(f"Error retrieving tracer record: {str(e)}") raise e # Parse the tracer output for our given variable @@ -38,7 +46,7 @@ def execute_analysis( tracer, variable ) except Exception as e: - print(f"Error parsing tracer output: {str(e)}") + logger.error(f"Error parsing tracer output: {str(e)}") raise e # Add the parsed tracer output to the prompt @@ -57,7 +65,7 @@ def execute_analysis( analysis: Generator = self.trigger_ai_analysis(prompt) return analysis except Exception as e: - print( + logger.error( f"Error generating AI analysis within tracer analysis service: {str(e)}" ) raise e @@ -69,6 +77,7 @@ def get_tracer( policy_id: str, api_version: str, ) -> list: + logger.log("Getting existing tracer analysis from tracers table") try: # Retrieve from the tracers table in the local database row = local_database.query( @@ -86,10 +95,12 @@ def get_tracer( return tracer_output_list except Exception as e: - print(f"Error getting existing tracer analysis: {str(e)}") + logger.error(f"Error getting existing tracer analysis: {str(e)}") raise e def _parse_tracer_output(self, tracer_output, target_variable): + logger.log("Parsing tracer output for target variable") + result = [] target_indent = None capturing = False diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 2d6ec9d7..754b5ba7 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -34,7 +34,10 @@ def __init__( """ # Check if running in debug; if so, don't initialize before Werkzeug, # otherwise we'll generate two log files, one which will be empty - if os.environ.get("FLASK_DEBUG") == "1" and os.environ.get("WERKZEUG_RUN_MAIN") != "true": + if ( + os.environ.get("FLASK_DEBUG") == "1" + and os.environ.get("WERKZEUG_RUN_MAIN") != "true" + ): print("Skipping logger initialization in debug mode pre-Werkzeug") return @@ -68,7 +71,9 @@ def __init__( self.log_file = self.logger_full_dir / f"{self.logger_id}.log" file_handler = logging.FileHandler(str(self.log_file)) file_handler.setLevel(logging.INFO) - print(f"Logging to file: logs/{self.logger_name}/{self.logger_id}.log") + print( + f"Logging to file: logs/{self.logger_name}/{self.logger_id}.log" + ) file_handler.setFormatter( logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" @@ -96,7 +101,9 @@ def __init__( cloud_handler = cloud_logging.handlers.CloudLoggingHandler( cloud_client, name=f"{self.logger_name}" ) - cloud_handler = cloud_logging.handlers.CloudLoggingHandler(cloud_client) + cloud_handler = cloud_logging.handlers.CloudLoggingHandler( + cloud_client + ) cloud_handler.setLevel(logging.INFO) cloud_handler.setFormatter( logging.Formatter("%(levelname)s - %(message)s") @@ -122,6 +129,10 @@ def log(self, message, level="info", **context): log_func = getattr(self.logger, level.lower()) log_func(message) + def error(self, message, **context): + """Convenience method to log an error message""" + self.log(message, level="error", **context) + # class WorkerLogger: # @staticmethod @@ -137,7 +148,7 @@ def log(self, message, level="info", **context): # current_job = get_current_job() # if current_job and current_job.worker_name: # return current_job.worker_name -# +# # # Try to get from current worker # try: # worker = Worker.find_by_key( @@ -147,10 +158,10 @@ def log(self, message, level="info", **context): # return worker.name # except: # pass -# +# # # Default to timestamp if no other ID found # return datetime.now().strftime("%Y%m%d_%H%M%S") -# +# # def __init__( # self, # worker_id=None, @@ -163,7 +174,7 @@ def log(self, message, level="info", **context): # ): # """ # Initialize logger with automatic worker ID detection if none provided -# +# # Args: # worker_id (str): Optional worker ID # log_to_cloud (bool): Whether to log to Google Cloud Logging @@ -175,9 +186,9 @@ def log(self, message, level="info", **context): # self.worker_id = worker_id or self.get_worker_id() # self.logger = logging.getLogger(f"worker_{self.worker_id}") # self.logger.setLevel(logging.INFO) -# +# # self.log_dir = Path(log_dir) -# +# # # Create log directory if it doesn't exist # self.log_dir = Path(log_dir) # try: @@ -188,7 +199,7 @@ def log(self, message, level="info", **context): # ) # # Fall back to current directory # self.log_dir = Path(".") -# +# # self.memory_monitor = None # if monitor_memory: # self.memory_monitor = MemoryMonitor( @@ -196,11 +207,11 @@ def log(self, message, level="info", **context): # threshold_percent=memory_threshold, # check_interval=memory_check_interval, # ) -# +# # self.cloud_client = None # self.cloud_logger = None # print(f"Initialized worker logger with ID: {self.worker_id}") -# +# # # Prevent duplicate handlers # if not self.logger.handlers: # # File handler - logs to local file @@ -213,7 +224,7 @@ def log(self, message, level="info", **context): # ) # ) # self.logger.addHandler(file_handler) -# +# # # Console handler # console_handler = logging.StreamHandler(sys.stdout) # console_handler.setFormatter( @@ -222,7 +233,7 @@ def log(self, message, level="info", **context): # ) # ) # self.logger.addHandler(console_handler) -# +# # # Google Cloud Logging handler # if log_to_cloud: # cloud_client = cloud_logging.Client() @@ -233,7 +244,7 @@ def log(self, message, level="info", **context): # logging.Formatter("%(levelname)s - %(message)s") # ) # self.logger.addHandler(cloud_handler) -# +# # def log(self, message, level="info", **context): # """ # Log a message with optional context data @@ -242,15 +253,15 @@ def log(self, message, level="info", **context): # current_job = get_current_job() # if current_job: # context["job_id"] = current_job.id -# +# # # Format message with context if provided # if context: # context_str = " ".join(f"{k}={v}" for k, v in context.items()) # message = f"{message} | {context_str}" -# +# # log_func = getattr(self.logger, level.lower()) # log_func(message) -# +# # def log_memory_stats( # self, process_memory_mb, process_percent, system_percent # ): @@ -263,28 +274,28 @@ def log(self, message, level="info", **context): # process_percent=round(process_percent, 2), # system_percent=round(system_percent, 2), # ) -# +# # def log_memory_warning(self, message, **context): # """Log memory warning""" # self.log( # message, level="warning", metric_type="memory_warning", **context # ) -# +# # def __enter__(self): # """Context manager entry""" # return self -# +# # def __exit__(self, exc_type, exc_val, exc_tb): # """Context manager exit - ensure cleanup""" # if self.memory_monitor: # self.memory_monitor.stop() -# -# +# +# # class MemoryMonitor: # def __init__(self, threshold_percent=90, check_interval=5, logger=None): # """ # Initialize memory monitor -# +# # Args: # threshold_percent (int): Memory usage threshold to trigger warnings (default: 75%) # check_interval (int): How often to check memory in seconds (default: 5) @@ -295,30 +306,30 @@ def log(self, message, level="info", **context): # self.monitor_thread: Optional[threading.Thread] = None # self.logger = proxy(logger) # self._pid = os.getpid() -# +# # def start(self): # """Start memory monitoring in a separate thread""" # self.stop_flag.clear() # self._pid = os.getpid() -# +# # self.monitor_thread = threading.Thread(target=self._monitor_memory) # self.monitor_thread.daemon = True # self.monitor_thread.start() -# +# # self._setup_signal_handlers() -# +# # def stop(self): # """Stop memory monitoring""" # if self.monitor_thread and self.monitor_thread.is_alive(): # self.stop_flag.set() # self.monitor_thread.join(timeout=1.0) -# +# # def _setup_signal_handlers(self): # """Setup signal handlers to stop monitoring""" -# +# # for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT): # signal.signal(sig, self._handle_signal) -# +# # def _handle_signal(self, signum, frame): # """Signal handler to stop monitoring""" # self.logger.log( @@ -326,20 +337,20 @@ def log(self, message, level="info", **context): # level="critical", # ) # self.stop() -# +# # def _monitor_memory(self): # """Memory monitoring loop""" # process = psutil.Process() # while not self.stop_flag.is_set(): # try: -# +# # if os.getpid() != self._pid: # self.logger.log( # "Memory monitor detected PID mismatch, stopping", # level="warning", # ) # break -# +# # try: # process = psutil.Process(self._pid) # except psutil.NoSuchProcess: @@ -348,14 +359,14 @@ def log(self, message, level="info", **context): # level="warning", # ) # break -# +# # if not process.is_running(): # self.logger.log( # "Memory monitor detected process stopped, stopping", # level="warning", # ) # break -# +# # try: # # Get memory info # memory_info = process.memory_info() @@ -367,25 +378,25 @@ def log(self, message, level="info", **context): # error_type=type(e).__name__, # ) # break -# +# # # Calculate usage percentages # process_percent = (memory_info.rss / system_memory.total) * 100 # system_percent = system_memory.percent -# +# # # Log memory stats # self.logger.log_memory_stats( # process_memory_mb=memory_info.rss / (1024 * 1024), # process_percent=process_percent, # system_percent=system_percent, # ) -# +# # # Check for high memory usage # if system_percent > self.threshold_percent: # self.logger.log_memory_warning( # f"High system memory usage: {system_percent:.1f}%", # system_percent=system_percent, # ) -# +# # if process_percent > ( # self.threshold_percent / 2 # ): # Process threshold at half of system @@ -393,12 +404,12 @@ def log(self, message, level="info", **context): # f"High process memory usage: {process_percent:.1f}%", # process_percent=process_percent, # ) -# +# # except Exception as e: # self.logger.log( # f"Error monitoring memory: {str(e)}", # level="error", # error_type=type(e).__name__, # ) -# -# time.sleep(self.check_interval) \ No newline at end of file +# +# time.sleep(self.check_interval) From beb651967c36fcfeeacd4767a404bc62285b09b0 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Dec 2024 20:15:56 +0100 Subject: [PATCH 06/18] chore: Add tests --- policyengine_api/utils/logger.py | 43 ++++----- tests/fixtures/logging_fixtures.py | 55 +++++++++++ tests/python/test_logging.py | 145 +++++++++++++++++++++++++++++ 3 files changed, 222 insertions(+), 21 deletions(-) create mode 100644 tests/fixtures/logging_fixtures.py create mode 100644 tests/python/test_logging.py diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 754b5ba7..024c2192 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -16,17 +16,17 @@ class Logger: def __init__( self, - logger_root_dir="logs", - logger_name="default", + folder="logs", + name="default", log_to_cloud=True, ): """ Initialize standard logger Three-part filepath: - - log_dir (defaults to "logs") - - logger_name (defaults to "default") - - logger_id (unique identifier for this logging session) + - folder (defaults to "logs") + - name (defaults to "default") + - id (unique identifier for this logging session) Args: log_to_cloud (bool): Whether to log to Google Cloud Logging @@ -41,24 +41,27 @@ def __init__( print("Skipping logger initialization in debug mode pre-Werkzeug") return - self.logger_root_dir = logger_root_dir - self.logger_name = logger_name - self.logger = logging.getLogger(logger_name) - self.logger.setLevel(logging.INFO) + # Generate three parts of storage path + self.folder = folder + self.name = name + self.dir = Path(self.folder).joinpath(self.name) + self.id = datetime.now().strftime("%Y%m%d_%H%M%S") - # Generate a unique ID based on time - self.logger_id = datetime.now().strftime("%Y%m%d_%H%M%S") + self.logger = logging.getLogger(self.name) + self.logger.setLevel(logging.INFO) # Create log directory if it doesn't exist - self.logger_full_dir = Path(self.logger_root_dir).joinpath(logger_name) try: - self.logger_full_dir.mkdir(parents=True, exist_ok=True) + self.dir.mkdir(parents=True, exist_ok=True) except Exception as e: print( - f"Warning: Could not create log directory {self.logger_full_dir}: {str(e)}" + f"Warning: Could not create log directory {self.dir}: {str(e)}" ) # Fall back to current directory - self.log_dir = Path(".") + self.dir = Path(".") + + # Create log file path based upon directory + self.filepath = self.dir.joinpath(f"{self.id}.log") self.memory_monitor = None self.cloud_client = None @@ -67,13 +70,11 @@ def __init__( # Prevent duplicate handlers if not self.logger.handlers: + print("Creating new handler") # File handler - logs to local file - self.log_file = self.logger_full_dir / f"{self.logger_id}.log" - file_handler = logging.FileHandler(str(self.log_file)) + file_handler = logging.FileHandler(str(self.filepath)) file_handler.setLevel(logging.INFO) - print( - f"Logging to file: logs/{self.logger_name}/{self.logger_id}.log" - ) + print(f"Logging to file: {self.filepath}") file_handler.setFormatter( logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" @@ -99,7 +100,7 @@ def __init__( print(f"Google Cloud Logging error: {str(e)}") return cloud_handler = cloud_logging.handlers.CloudLoggingHandler( - cloud_client, name=f"{self.logger_name}" + cloud_client, name=f"{self.name}" ) cloud_handler = cloud_logging.handlers.CloudLoggingHandler( cloud_client diff --git a/tests/fixtures/logging_fixtures.py b/tests/fixtures/logging_fixtures.py new file mode 100644 index 00000000..6f9a092d --- /dev/null +++ b/tests/fixtures/logging_fixtures.py @@ -0,0 +1,55 @@ +import pytest +import os +from unittest.mock import patch, Mock +import logging +from datetime import datetime + +TEST_LOGGER_NAME = "test_logger" +TEST_LOGGER_ID = "20240101_120000" + + +@pytest.fixture(autouse=True) +def mock_flask_debug(): + """ + Fixture to set FLASK_DEBUG=0 for all tests; this is + necessary because the logger has special behavior in debug + """ + with patch.dict(os.environ, {"FLASK_DEBUG": "0"}): + yield + + +@pytest.fixture(autouse=True) +def clear_handlers(): + """Fixture to clear handlers before each test""" + logging.getLogger(TEST_LOGGER_NAME).handlers.clear() + yield + logging.getLogger(TEST_LOGGER_NAME).handlers.clear() + + +@pytest.fixture +def temp_log_dir(tmp_path): + """Fixture to create a temporary log directory""" + log_dir = tmp_path / "logs" + log_dir.mkdir(exist_ok=True) # Create the directory + return log_dir + + +@pytest.fixture +def mock_datetime(): + """Fixture to mock datetime for consistent id""" + with patch("policyengine_api.utils.logger.datetime") as mock_dt: + mock_dt.now.return_value = datetime(2024, 1, 1, 12, 0, 0) + yield mock_dt + + +@pytest.fixture +def mock_cloud_logging(): + """Fixture to mock Google Cloud Logging""" + with patch( + "policyengine_api.utils.logger.cloud_logging.Client" + ) as mock_client: + mock_handler = Mock() + mock_client.return_value.get_default_handler.return_value = ( + mock_handler + ) + yield mock_client diff --git a/tests/python/test_logging.py b/tests/python/test_logging.py new file mode 100644 index 00000000..1d41341f --- /dev/null +++ b/tests/python/test_logging.py @@ -0,0 +1,145 @@ +import pytest +import logging +from pathlib import Path +import os +from unittest.mock import Mock, patch, MagicMock +from datetime import datetime +from google.cloud import logging as cloud_logging +from policyengine_api.utils.logger import Logger + +from tests.fixtures.logging_fixtures import ( + mock_flask_debug, + clear_handlers, + temp_log_dir, + mock_datetime, + mock_cloud_logging, + TEST_LOGGER_NAME, + TEST_LOGGER_ID, +) + + +class TestLogger: + def test_logger_initialization(self, temp_log_dir, mock_datetime): + """Test basic logger initialization""" + logger = Logger( + folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + # Check if logger was properly initialized + assert logger.name == TEST_LOGGER_NAME + assert logger.id == TEST_LOGGER_ID + assert logger.logger.level == logging.INFO + + # Verify log directory creation + expected_log_dir = temp_log_dir / TEST_LOGGER_NAME + assert expected_log_dir.exists() + + # Verify log file creation + expected_log_file = expected_log_dir / (TEST_LOGGER_ID + ".log") + assert expected_log_file.exists() + + def test_logger_handlers(self, temp_log_dir): + """Test that appropriate handlers are added""" + logger = Logger( + folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + # Should have exactly 2 handlers (file and console) when cloud logging is disabled + assert len(logger.logger.handlers) == 2 + + # Verify handler types + handlers = logger.logger.handlers + assert any(isinstance(h, logging.FileHandler) for h in handlers) + assert any(isinstance(h, logging.StreamHandler) for h in handlers) + + def test_cloud_logging_initialization( + self, temp_log_dir, mock_cloud_logging + ): + """Test initialization with cloud logging enabled""" + logger = Logger( + folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=True + ) + + # Should have 3 handlers (file, console, and cloud) + assert len(logger.logger.handlers) == 3 + mock_cloud_logging.assert_called_once() + + def test_log_message(self, temp_log_dir, mock_datetime): + """Test logging a message""" + logger = Logger( + folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + test_message = "Test log message" + logger.log(test_message) + + # Read the log file and verify the message was logged + log_file = temp_log_dir / TEST_LOGGER_NAME / (TEST_LOGGER_ID + ".log") + assert log_file.exists(), f"Log file not found at {log_file}" + with open(log_file, "r") as f: + log_content = f.read() + assert test_message in log_content + + def test_log_with_context(self, temp_log_dir, mock_datetime): + """Test logging a message with context""" + logger = Logger( + folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + test_message = "Test message" + context = {"user": "test_user", "action": "login"} + logger.log(test_message, **context) + + # Read the log file and verify the message and context were logged + log_file = temp_log_dir / TEST_LOGGER_NAME / (TEST_LOGGER_ID + ".log") + assert log_file.exists(), f"Log file not found at {log_file}" + with open(log_file, "r") as f: + log_content = f.read() + assert test_message in log_content + assert "user=test_user" in log_content + assert "action=login" in log_content + + def test_error_logging(self, temp_log_dir, mock_datetime): + """Test error logging functionality""" + logger = Logger( + folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + error_message = "Test error message" + logger.error(error_message) + + # Use the exact log file path based on the mocked datetime + log_file = temp_log_dir / TEST_LOGGER_NAME / (TEST_LOGGER_ID + ".log") + assert log_file.exists(), f"Log file not found at {log_file}" + + with open(log_file, "r") as f: + log_content = f.read() + assert ( + error_message in log_content + ), "Error message not found in log content" + assert "ERROR" in log_content, "ERROR level not found in log content" + + def test_debug_mode_behavior(self, temp_log_dir): + """Test logger behavior in debug mode""" + with patch.dict(os.environ, {"FLASK_DEBUG": "1"}): + logger = Logger( + folder=str(temp_log_dir), + name=TEST_LOGGER_NAME, + log_to_cloud=False, + ) + # Logger should not be initialized in debug mode + assert not hasattr(logger, "logger") + + def test_failed_directory_creation(self, temp_log_dir, mock_datetime): + """Test fallback behavior when log directory creation fails""" + with patch("pathlib.Path.mkdir") as mock_mkdir: + mock_mkdir.side_effect = PermissionError("Permission denied") + + logger = Logger( + folder=str(temp_log_dir), + name=TEST_LOGGER_NAME, + log_to_cloud=False, + ) + + # Should fallback to current directory + assert logger.dir == Path(".") From 9a448726003f14eccf4e82f641aa6f4b0c6debca Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 4 Dec 2024 22:53:23 +0100 Subject: [PATCH 07/18] feat: Beginnings of WorkerLogger --- .../jobs/calculate_economy_simulation_job.py | 22 +- policyengine_api/utils/__init__.py | 1 + policyengine_api/utils/logger.py | 292 ------------------ policyengine_api/utils/worker_logger.py | 206 ++++++++++++ 4 files changed, 220 insertions(+), 301 deletions(-) create mode 100644 policyengine_api/utils/worker_logger.py diff --git a/policyengine_api/jobs/calculate_economy_simulation_job.py b/policyengine_api/jobs/calculate_economy_simulation_job.py index c8536876..cce1a9c3 100644 --- a/policyengine_api/jobs/calculate_economy_simulation_job.py +++ b/policyengine_api/jobs/calculate_economy_simulation_job.py @@ -14,12 +14,14 @@ from policyengine_api.endpoints.economy.reform_impact import set_comment_on_job from policyengine_api.constants import COUNTRY_PACKAGE_VERSIONS from policyengine_api.country import COUNTRIES, create_policy_reform +from policyengine_api.utils import WorkerLogger from policyengine_core.simulations import Microsimulation from policyengine_us import Microsimulation from policyengine_uk import Microsimulation reform_impacts_service = ReformImpactsService() +logger = WorkerLogger() class CalculateEconomySimulationJob(BaseJob): @@ -38,7 +40,7 @@ def run( baseline_policy: dict, reform_policy: dict, ): - print(f"Starting CalculateEconomySimulationJob.run") + logger.log(f"Starting CalculateEconomySimulationJob.run") try: # Configure inputs # Note for anyone modifying options_hash: redis-queue treats ":" as a namespace @@ -61,7 +63,7 @@ def run( COUNTRY_PACKAGE_VERSIONS[country_id], ) if any(x["status"] == "ok" for x in existing): - print(f"Job already completed successfully") + logger.log(f"Job already completed successfully") return # Save identifiers for later commenting on processing status @@ -75,7 +77,7 @@ def run( options_hash, ) - print("Checking existing reform impacts...") + logger.log("Checking existing reform impacts...") # Query existing impacts before deleting existing = reform_impacts_service.get_all_reform_impacts( country_id, @@ -87,7 +89,7 @@ def run( options_hash, COUNTRY_PACKAGE_VERSIONS[country_id], ) - print(f"Found {len(existing)} existing impacts before delete") + logger.log(f"Found {len(existing)} existing impacts before delete") # Delete any existing reform impact rows with the same identifiers reform_impacts_service.delete_reform_impact( @@ -100,7 +102,7 @@ def run( options_hash, ) - print("Deleted existing computing impacts") + logger.log("Deleted existing computing impacts") # Insert new reform impact row with status 'computing' reform_impacts_service.set_reform_impact( @@ -123,6 +125,7 @@ def run( comment = lambda x: set_comment_on_job(x, *identifiers) comment("Computing baseline") + logger.log("Computing baseline") # Compute baseline economy baseline_economy = self._compute_economy( @@ -134,6 +137,7 @@ def run( policy_json=baseline_policy, ) comment("Computing reform") + logger.log("Computing reform") # Compute reform economy reform_economy = self._compute_economy( @@ -175,7 +179,7 @@ def run( options_hash, message=traceback.format_exc(), ) - print(f"Error setting reform impact: {str(e)}") + logger.error(f"Error setting reform impact: {str(e)}") raise e def _compute_economy( @@ -222,7 +226,7 @@ def _compute_economy( simulation.delete_arrays("person_weight", time_period) if options.get("target") == "cliff": - print(f"Initialised cliff impact computation") + logger.log(f"Initialised cliff impact computation") return { "status": "ok", "result": self._compute_cliff_impacts(simulation), @@ -282,7 +286,7 @@ def _create_simulation_us( # Second statement provides backwards compatibility option # for running a simulation with the "enhanced_us" region if dataset in DATASETS or region == "enhanced_us": - print(f"Running an enhanced CPS simulation") + logger.log(f"Running an enhanced CPS simulation") from policyengine_us_data import EnhancedCPS_2024 sim_options["dataset"] = EnhancedCPS_2024 @@ -290,7 +294,7 @@ def _create_simulation_us( # Handle region settings; need to be mindful not to place # legacy enhanced_us region in this block if region not in ["us", "enhanced_us"]: - print(f"Filtering US dataset down to region {region}") + logger.log(f"Filtering US dataset down to region {region}") from policyengine_us_data import Pooled_3_Year_CPS_2023 diff --git a/policyengine_api/utils/__init__.py b/policyengine_api/utils/__init__.py index fb39956e..381f2792 100644 --- a/policyengine_api/utils/__init__.py +++ b/policyengine_api/utils/__init__.py @@ -3,3 +3,4 @@ from .singleton import Singleton from .get_current_law import get_current_law_policy_id from .logger import Logger +from .worker_logger import WorkerLogger diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 024c2192..6aa547e1 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -1,16 +1,9 @@ import logging import sys -from rq import Worker, get_current_job from google.cloud import logging as cloud_logging from datetime import datetime -import time -import threading -import psutil -from typing import Optional from pathlib import Path import os -from weakref import proxy -import signal class Logger: @@ -66,15 +59,12 @@ def __init__( self.memory_monitor = None self.cloud_client = None self.cloud_logger = None - print(f"Initialized logger") # Prevent duplicate handlers if not self.logger.handlers: - print("Creating new handler") # File handler - logs to local file file_handler = logging.FileHandler(str(self.filepath)) file_handler.setLevel(logging.INFO) - print(f"Logging to file: {self.filepath}") file_handler.setFormatter( logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" @@ -119,7 +109,6 @@ def log(self, message, level="info", **context): # Don't log if running in debug and Werkzeug not initialized; # this will prevent duplicate log files if getattr(self, "logger", None) is None: - print("Logger not initialized; skipping log") return # Format message with context if provided @@ -133,284 +122,3 @@ def log(self, message, level="info", **context): def error(self, message, **context): """Convenience method to log an error message""" self.log(message, level="error", **context) - - -# class WorkerLogger: -# @staticmethod -# def get_worker_id(): -# """ -# Attempts to get the worker ID through various methods: -# 1. From current RQ job -# 2. From environment variable -# 3. From RQ worker name -# 4. Generates a default if none found -# """ -# # Try to get from current job context -# current_job = get_current_job() -# if current_job and current_job.worker_name: -# return current_job.worker_name -# -# # Try to get from current worker -# try: -# worker = Worker.find_by_key( -# Worker.worker_key_prefix + current_job.worker_name -# ) -# if worker: -# return worker.name -# except: -# pass -# -# # Default to timestamp if no other ID found -# return datetime.now().strftime("%Y%m%d_%H%M%S") -# -# def __init__( -# self, -# worker_id=None, -# job_id=None, -# log_to_cloud=True, -# log_dir="logs", -# monitor_memory=True, -# memory_threshold=75, -# memory_check_interval=5, -# ): -# """ -# Initialize logger with automatic worker ID detection if none provided -# -# Args: -# worker_id (str): Optional worker ID -# log_to_cloud (bool): Whether to log to Google Cloud Logging -# log_dir (str): Directory to store local log files (defaults to "logs") -# monitor_memory (bool): Whether to monitor memory usage -# memory_threshold (int): Memory usage threshold to trigger warnings (default: 90%) -# memory_check_interval (int): How often to check memory in seconds (default: 5) -# """ -# self.worker_id = worker_id or self.get_worker_id() -# self.logger = logging.getLogger(f"worker_{self.worker_id}") -# self.logger.setLevel(logging.INFO) -# -# self.log_dir = Path(log_dir) -# -# # Create log directory if it doesn't exist -# self.log_dir = Path(log_dir) -# try: -# self.log_dir.mkdir(parents=True, exist_ok=True) -# except Exception as e: -# print( -# f"Warning: Could not create log directory {log_dir}: {str(e)}" -# ) -# # Fall back to current directory -# self.log_dir = Path(".") -# -# self.memory_monitor = None -# if monitor_memory: -# self.memory_monitor = MemoryMonitor( -# logger=self, -# threshold_percent=memory_threshold, -# check_interval=memory_check_interval, -# ) -# -# self.cloud_client = None -# self.cloud_logger = None -# print(f"Initialized worker logger with ID: {self.worker_id}") -# -# # Prevent duplicate handlers -# if not self.logger.handlers: -# # File handler - logs to local file -# log_file = self.log_dir / f"worker_{self.worker_id}.log" -# file_handler = logging.FileHandler(str(log_file)) -# print(f"Logging to file: logs/worker_{self.worker_id}.log") -# file_handler.setFormatter( -# logging.Formatter( -# "%(asctime)s - %(name)s - %(levelname)s - %(message)s" -# ) -# ) -# self.logger.addHandler(file_handler) -# -# # Console handler -# console_handler = logging.StreamHandler(sys.stdout) -# console_handler.setFormatter( -# logging.Formatter( -# "%(asctime)s - %(name)s - %(levelname)s - %(message)s" -# ) -# ) -# self.logger.addHandler(console_handler) -# -# # Google Cloud Logging handler -# if log_to_cloud: -# cloud_client = cloud_logging.Client() -# cloud_handler = cloud_logging.handlers.CloudLoggingHandler( -# cloud_client, name=f"worker_{self.worker_id}" -# ) -# cloud_handler.setFormatter( -# logging.Formatter("%(levelname)s - %(message)s") -# ) -# self.logger.addHandler(cloud_handler) -# -# def log(self, message, level="info", **context): -# """ -# Log a message with optional context data -# """ -# # Add job ID to context if available -# current_job = get_current_job() -# if current_job: -# context["job_id"] = current_job.id -# -# # Format message with context if provided -# if context: -# context_str = " ".join(f"{k}={v}" for k, v in context.items()) -# message = f"{message} | {context_str}" -# -# log_func = getattr(self.logger, level.lower()) -# log_func(message) -# -# def log_memory_stats( -# self, process_memory_mb, process_percent, system_percent -# ): -# """Log memory statistics""" -# self.log( -# "Memory usage stats", -# level="info", -# metric_type="memory_usage", -# process_memory_mb=round(process_memory_mb, 2), -# process_percent=round(process_percent, 2), -# system_percent=round(system_percent, 2), -# ) -# -# def log_memory_warning(self, message, **context): -# """Log memory warning""" -# self.log( -# message, level="warning", metric_type="memory_warning", **context -# ) -# -# def __enter__(self): -# """Context manager entry""" -# return self -# -# def __exit__(self, exc_type, exc_val, exc_tb): -# """Context manager exit - ensure cleanup""" -# if self.memory_monitor: -# self.memory_monitor.stop() -# -# -# class MemoryMonitor: -# def __init__(self, threshold_percent=90, check_interval=5, logger=None): -# """ -# Initialize memory monitor -# -# Args: -# threshold_percent (int): Memory usage threshold to trigger warnings (default: 75%) -# check_interval (int): How often to check memory in seconds (default: 5) -# """ -# self.threshold_percent = threshold_percent -# self.check_interval = check_interval -# self.stop_flag = threading.Event() -# self.monitor_thread: Optional[threading.Thread] = None -# self.logger = proxy(logger) -# self._pid = os.getpid() -# -# def start(self): -# """Start memory monitoring in a separate thread""" -# self.stop_flag.clear() -# self._pid = os.getpid() -# -# self.monitor_thread = threading.Thread(target=self._monitor_memory) -# self.monitor_thread.daemon = True -# self.monitor_thread.start() -# -# self._setup_signal_handlers() -# -# def stop(self): -# """Stop memory monitoring""" -# if self.monitor_thread and self.monitor_thread.is_alive(): -# self.stop_flag.set() -# self.monitor_thread.join(timeout=1.0) -# -# def _setup_signal_handlers(self): -# """Setup signal handlers to stop monitoring""" -# -# for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT): -# signal.signal(sig, self._handle_signal) -# -# def _handle_signal(self, signum, frame): -# """Signal handler to stop monitoring""" -# self.logger.log( -# f"Received signal {signum}, stopping memory monitor", -# level="critical", -# ) -# self.stop() -# -# def _monitor_memory(self): -# """Memory monitoring loop""" -# process = psutil.Process() -# while not self.stop_flag.is_set(): -# try: -# -# if os.getpid() != self._pid: -# self.logger.log( -# "Memory monitor detected PID mismatch, stopping", -# level="warning", -# ) -# break -# -# try: -# process = psutil.Process(self._pid) -# except psutil.NoSuchProcess: -# self.logger.log( -# "Memory monitor detected missing process, stopping", -# level="warning", -# ) -# break -# -# if not process.is_running(): -# self.logger.log( -# "Memory monitor detected process stopped, stopping", -# level="warning", -# ) -# break -# -# try: -# # Get memory info -# memory_info = process.memory_info() -# system_memory = psutil.virtual_memory() -# except Exception as e: -# self.logger.log( -# f"Error getting memory info: {str(e)}", -# level="error", -# error_type=type(e).__name__, -# ) -# break -# -# # Calculate usage percentages -# process_percent = (memory_info.rss / system_memory.total) * 100 -# system_percent = system_memory.percent -# -# # Log memory stats -# self.logger.log_memory_stats( -# process_memory_mb=memory_info.rss / (1024 * 1024), -# process_percent=process_percent, -# system_percent=system_percent, -# ) -# -# # Check for high memory usage -# if system_percent > self.threshold_percent: -# self.logger.log_memory_warning( -# f"High system memory usage: {system_percent:.1f}%", -# system_percent=system_percent, -# ) -# -# if process_percent > ( -# self.threshold_percent / 2 -# ): # Process threshold at half of system -# self.logger.log_memory_warning( -# f"High process memory usage: {process_percent:.1f}%", -# process_percent=process_percent, -# ) -# -# except Exception as e: -# self.logger.log( -# f"Error monitoring memory: {str(e)}", -# level="error", -# error_type=type(e).__name__, -# ) -# -# time.sleep(self.check_interval) diff --git a/policyengine_api/utils/worker_logger.py b/policyengine_api/utils/worker_logger.py new file mode 100644 index 00000000..48e0faf9 --- /dev/null +++ b/policyengine_api/utils/worker_logger.py @@ -0,0 +1,206 @@ +from policyengine_api.utils.logger import Logger +from rq import Worker, get_current_job +from datetime import datetime +import time +import threading +import psutil +from typing import Optional +import os +from weakref import proxy +import signal + +class WorkerLogger(Logger): + """ + Custom logger for worker processes + """ + def __init__( + self, + folder="logs", + name="worker", + log_to_cloud=True, + worker_id=None, + job_id=None, + monitor_memory=True, + memory_threshold=75, + memory_check_interval=5, + ): + """ + Initialize logger with automatic worker ID detection if none provided + + All args optional + Args: + folder (str): Directory to store log files (defaults to "logs") + name (str): Optional name of the worker; will be found automatically if not provided + log_to_cloud (bool): Whether to log to Google Cloud Logging (defaults to True) + worker_id (str): Optional worker ID + job_id (str): Optional job ID + monitor_memory (bool): Whether to monitor memory usage + memory_threshold (int): Memory usage threshold to trigger warnings (default: 90%) + memory_check_interval (int): How often to check memory in seconds (default: 5) + """ + super().__init__( + name=f"worker_{self.get_worker_id()}", + folder=folder, + log_to_cloud=log_to_cloud, + ) + + self.worker_id = worker_id or self.get_worker_id() + self.memory_monitor = None + if monitor_memory: + self.memory_monitor = MemoryMonitor( + logger=self, + threshold_percent=memory_threshold, + check_interval=memory_check_interval, + ) + + print(f"Initialized worker logger with ID: {self.worker_id}") + + @staticmethod + def get_worker_id(): + """ + Attempts to get the worker ID through various methods: + 1. From current RQ job + 2. From environment variable + 3. From RQ worker name + 4. Generates a default if none found + """ + # Try to get from current job context + current_job = get_current_job() + if current_job and current_job.worker_name: + return current_job.worker_name + + # Try to get from current worker + try: + worker = Worker.find_by_key( + Worker.worker_key_prefix + current_job.worker_name + ) + if worker: + return worker.name + except: + pass + + # Default to timestamp if no other ID found + return datetime.now().strftime("%Y%m%d_%H%M%S") + +class MemoryMonitor: + def __init__(self, threshold_percent=90, check_interval=5, logger=None): + """ + Initialize memory monitor + + Args: + threshold_percent (int): Memory usage threshold to trigger warnings (default: 75%) + check_interval (int): How often to check memory in seconds (default: 5) + """ + self.threshold_percent = threshold_percent + self.check_interval = check_interval + self.stop_flag = threading.Event() + self.monitor_thread: Optional[threading.Thread] = None + self.logger = proxy(logger) + self._pid = os.getpid() + + def start(self): + """Start memory monitoring in a separate thread""" + self.stop_flag.clear() + self._pid = os.getpid() + + self.monitor_thread = threading.Thread(target=self._monitor_memory) + self.monitor_thread.daemon = True + self.monitor_thread.start() + + self._setup_signal_handlers() + + def stop(self): + """Stop memory monitoring""" + if self.monitor_thread and self.monitor_thread.is_alive(): + self.stop_flag.set() + self.monitor_thread.join(timeout=1.0) + + def _setup_signal_handlers(self): + """Setup signal handlers to stop monitoring""" + + for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT): + signal.signal(sig, self._handle_signal) + + def _handle_signal(self, signum, frame): + """Signal handler to stop monitoring""" + self.logger.log( + f"Received signal {signum}, stopping memory monitor", + level="critical", + ) + self.stop() + + def _monitor_memory(self): + """Memory monitoring loop""" + process = psutil.Process() + while not self.stop_flag.is_set(): + try: + + if os.getpid() != self._pid: + self.logger.log( + "Memory monitor detected PID mismatch, stopping", + level="warning", + ) + break + + try: + process = psutil.Process(self._pid) + except psutil.NoSuchProcess: + self.logger.log( + "Memory monitor detected missing process, stopping", + level="warning", + ) + break + + if not process.is_running(): + self.logger.log( + "Memory monitor detected process stopped, stopping", + level="warning", + ) + break + + try: + # Get memory info + memory_info = process.memory_info() + system_memory = psutil.virtual_memory() + except Exception as e: + self.logger.log( + f"Error getting memory info: {str(e)}", + level="error", + error_type=type(e).__name__, + ) + break + + # Calculate usage percentages + process_percent = (memory_info.rss / system_memory.total) * 100 + system_percent = system_memory.percent + + # Log memory stats + self.logger.log_memory_stats( + process_memory_mb=memory_info.rss / (1024 * 1024), + process_percent=process_percent, + system_percent=system_percent, + ) + + # Check for high memory usage + if system_percent > self.threshold_percent: + self.logger.log_memory_warning( + f"High system memory usage: {system_percent:.1f}%", + system_percent=system_percent, + ) + + if process_percent > ( + self.threshold_percent / 2 + ): # Process threshold at half of system + self.logger.log_memory_warning( + f"High process memory usage: {process_percent:.1f}%", + process_percent=process_percent, + ) + + except Exception as e: + self.logger.log( + f"Error monitoring memory: {str(e)}", + level="error", + error_type=type(e).__name__, + ) + + time.sleep(self.check_interval) From cfdcd6c11fae299ef76fc2ee02717b69230213ef Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Sun, 8 Dec 2024 13:11:36 +0100 Subject: [PATCH 08/18] fix: Remove unstable debug check --- .../jobs/calculate_economy_simulation_job.py | 3 +-- policyengine_api/utils/logger.py | 14 -------------- policyengine_api/utils/worker_logger.py | 5 +++-- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/policyengine_api/jobs/calculate_economy_simulation_job.py b/policyengine_api/jobs/calculate_economy_simulation_job.py index cce1a9c3..5d86c895 100644 --- a/policyengine_api/jobs/calculate_economy_simulation_job.py +++ b/policyengine_api/jobs/calculate_economy_simulation_job.py @@ -21,8 +21,6 @@ from policyengine_uk import Microsimulation reform_impacts_service = ReformImpactsService() -logger = WorkerLogger() - class CalculateEconomySimulationJob(BaseJob): def __init__(self): @@ -40,6 +38,7 @@ def run( baseline_policy: dict, reform_policy: dict, ): + logger = WorkerLogger() logger.log(f"Starting CalculateEconomySimulationJob.run") try: # Configure inputs diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 6aa547e1..5cd09305 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -25,15 +25,6 @@ def __init__( log_to_cloud (bool): Whether to log to Google Cloud Logging log_root_dir (str): Directory to store local log files (defaults to "logs") """ - # Check if running in debug; if so, don't initialize before Werkzeug, - # otherwise we'll generate two log files, one which will be empty - if ( - os.environ.get("FLASK_DEBUG") == "1" - and os.environ.get("WERKZEUG_RUN_MAIN") != "true" - ): - print("Skipping logger initialization in debug mode pre-Werkzeug") - return - # Generate three parts of storage path self.folder = folder self.name = name @@ -106,11 +97,6 @@ def log(self, message, level="info", **context): Log a message with optional context data """ - # Don't log if running in debug and Werkzeug not initialized; - # this will prevent duplicate log files - if getattr(self, "logger", None) is None: - return - # Format message with context if provided if context: context_str = " ".join(f"{k}={v}" for k, v in context.items()) diff --git a/policyengine_api/utils/worker_logger.py b/policyengine_api/utils/worker_logger.py index 48e0faf9..e26981c9 100644 --- a/policyengine_api/utils/worker_logger.py +++ b/policyengine_api/utils/worker_logger.py @@ -38,13 +38,14 @@ def __init__( memory_threshold (int): Memory usage threshold to trigger warnings (default: 90%) memory_check_interval (int): How often to check memory in seconds (default: 5) """ + self.worker_id = worker_id or self.get_worker_id() + super().__init__( - name=f"worker_{self.get_worker_id()}", + name=f"worker_{self.worker_id}", folder=folder, log_to_cloud=log_to_cloud, ) - self.worker_id = worker_id or self.get_worker_id() self.memory_monitor = None if monitor_memory: self.memory_monitor = MemoryMonitor( From 804f10349250a91a2e34051157957b9bbd790783 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Sun, 8 Dec 2024 13:35:11 +0100 Subject: [PATCH 09/18] fix: Change default logger name to 'api_main' --- policyengine_api/utils/logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 5cd09305..69bc3042 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -10,7 +10,7 @@ class Logger: def __init__( self, folder="logs", - name="default", + name="api_main", log_to_cloud=True, ): """ @@ -18,7 +18,7 @@ def __init__( Three-part filepath: - folder (defaults to "logs") - - name (defaults to "default") + - name (defaults to "api_main") - id (unique identifier for this logging session) Args: From bfaba0f8d5486f325a9f64185a2d009fa41bcad2 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Sun, 8 Dec 2024 13:58:26 +0100 Subject: [PATCH 10/18] fix: Improve worker logger naming --- .../jobs/calculate_economy_simulation_job.py | 1 + policyengine_api/utils/logger.py | 20 ++-- policyengine_api/utils/worker_logger.py | 104 ++++++++++-------- 3 files changed, 69 insertions(+), 56 deletions(-) diff --git a/policyengine_api/jobs/calculate_economy_simulation_job.py b/policyengine_api/jobs/calculate_economy_simulation_job.py index 5d86c895..fa6d7035 100644 --- a/policyengine_api/jobs/calculate_economy_simulation_job.py +++ b/policyengine_api/jobs/calculate_economy_simulation_job.py @@ -22,6 +22,7 @@ reform_impacts_service = ReformImpactsService() + class CalculateEconomySimulationJob(BaseJob): def __init__(self): super().__init__() diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 69bc3042..3ee78572 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -9,27 +9,27 @@ class Logger: def __init__( self, - folder="logs", + dir="logs", name="api_main", + id=datetime.now().strftime("%Y%m%d_%H%M%S"), log_to_cloud=True, ): """ Initialize standard logger - Three-part filepath: - - folder (defaults to "logs") - - name (defaults to "api_main") - - id (unique identifier for this logging session) + Filepath: + dir/name_id.log Args: + dir (str): Directory to store log files (defaults to "logs") + name (str): Name of the logger (defaults to "api_main") + id (str): ID to append to log file name; if not provided, will use current timestamp log_to_cloud (bool): Whether to log to Google Cloud Logging - log_root_dir (str): Directory to store local log files (defaults to "logs") """ # Generate three parts of storage path - self.folder = folder + self.dir = Path(dir) self.name = name - self.dir = Path(self.folder).joinpath(self.name) - self.id = datetime.now().strftime("%Y%m%d_%H%M%S") + self.id = id self.logger = logging.getLogger(self.name) self.logger.setLevel(logging.INFO) @@ -45,7 +45,7 @@ def __init__( self.dir = Path(".") # Create log file path based upon directory - self.filepath = self.dir.joinpath(f"{self.id}.log") + self.filepath = self.dir.joinpath(f"{self.name}_{self.id}.log") self.memory_monitor = None self.cloud_client = None diff --git a/policyengine_api/utils/worker_logger.py b/policyengine_api/utils/worker_logger.py index e26981c9..b1a0deee 100644 --- a/policyengine_api/utils/worker_logger.py +++ b/policyengine_api/utils/worker_logger.py @@ -9,17 +9,18 @@ from weakref import proxy import signal + class WorkerLogger(Logger): - """ - Custom logger for worker processes - """ - def __init__( + """ + Custom logger for worker processes + """ + + def __init__( self, - folder="logs", + dir="logs", name="worker", + id=None, log_to_cloud=True, - worker_id=None, - job_id=None, monitor_memory=True, memory_threshold=75, memory_check_interval=5, @@ -29,21 +30,28 @@ def __init__( All args optional Args: - folder (str): Directory to store log files (defaults to "logs") - name (str): Optional name of the worker; will be found automatically if not provided + dir (str): Directory to store log files (defaults to "logs") + name (str): Name of the logger (defaults to "worker") + id (str): ID to append to log file name; if not provided, will fetch worker name log_to_cloud (bool): Whether to log to Google Cloud Logging (defaults to True) - worker_id (str): Optional worker ID - job_id (str): Optional job ID - monitor_memory (bool): Whether to monitor memory usage - memory_threshold (int): Memory usage threshold to trigger warnings (default: 90%) + monitor_memory (bool): Whether to monitor memory usage (defaults to True) + memory_threshold (int): Memory usage threshold to trigger warnings (default: 75%) memory_check_interval (int): How often to check memory in seconds (default: 5) """ - self.worker_id = worker_id or self.get_worker_id() + self.dir = dir + self.name = name + self.log_to_cloud = log_to_cloud + + if id is not None: + self.id = id + else: + self.id = self.get_worker_id() super().__init__( - name=f"worker_{self.worker_id}", - folder=folder, - log_to_cloud=log_to_cloud, + dir=self.dir, + name=self.name, + id=self.id, + log_to_cloud=self.log_to_cloud, ) self.memory_monitor = None @@ -54,35 +62,39 @@ def __init__( check_interval=memory_check_interval, ) - print(f"Initialized worker logger with ID: {self.worker_id}") - - @staticmethod - def get_worker_id(): - """ - Attempts to get the worker ID through various methods: - 1. From current RQ job - 2. From environment variable - 3. From RQ worker name - 4. Generates a default if none found - """ - # Try to get from current job context - current_job = get_current_job() - if current_job and current_job.worker_name: - return current_job.worker_name - - # Try to get from current worker - try: - worker = Worker.find_by_key( - Worker.worker_key_prefix + current_job.worker_name - ) - if worker: - return worker.name - except: - pass - - # Default to timestamp if no other ID found - return datetime.now().strftime("%Y%m%d_%H%M%S") - + print(f"Initialized worker logger with ID: {self.id}") + + @staticmethod + def get_worker_id(): + """ + Attempts to get the worker ID through various methods: + 1. From current RQ job + 2. From environment variable + 3. From RQ worker name + 4. Generates a default if none found + """ + # Try to get from current job context + current_job = get_current_job() + print(f"Current job worker name: {current_job.worker_name}") + if current_job and current_job.worker_name: + return current_job.worker_name + + # Try to get from current worker + try: + print(f"Current worker name: {Worker.worker_key_prefix}") + worker = Worker.find_by_key( + Worker.worker_key_prefix + current_job.worker_name + ) + if worker: + return worker.name + except: + pass + + # Default to timestamp if no other ID found + print("Returning datetime") + return datetime.now().strftime("%Y%m%d_%H%M%S") + + class MemoryMonitor: def __init__(self, threshold_percent=90, check_interval=5, logger=None): """ From c1c715fedd1fe90da4f10de2e6726161cd91d29c Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 9 Dec 2024 15:55:20 +0100 Subject: [PATCH 11/18] fix: Intialize worker properly upon worker launch --- policyengine_api/utils/worker_logger.py | 3 --- policyengine_api/worker.py | 3 +++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/policyengine_api/utils/worker_logger.py b/policyengine_api/utils/worker_logger.py index b1a0deee..8e511108 100644 --- a/policyengine_api/utils/worker_logger.py +++ b/policyengine_api/utils/worker_logger.py @@ -75,13 +75,11 @@ def get_worker_id(): """ # Try to get from current job context current_job = get_current_job() - print(f"Current job worker name: {current_job.worker_name}") if current_job and current_job.worker_name: return current_job.worker_name # Try to get from current worker try: - print(f"Current worker name: {Worker.worker_key_prefix}") worker = Worker.find_by_key( Worker.worker_key_prefix + current_job.worker_name ) @@ -91,7 +89,6 @@ def get_worker_id(): pass # Default to timestamp if no other ID found - print("Returning datetime") return datetime.now().strftime("%Y%m%d_%H%M%S") diff --git a/policyengine_api/worker.py b/policyengine_api/worker.py index 8bbc9841..8c6269a7 100644 --- a/policyengine_api/worker.py +++ b/policyengine_api/worker.py @@ -1,6 +1,8 @@ from redis import Redis from rq import Worker +from policyengine_api.utils.worker_logger import WorkerLogger + # Preload libraries import policyengine_uk import policyengine_us @@ -9,4 +11,5 @@ # Provide the worker with the list of queues (str) to listen to. w = Worker(["default"], connection=Redis()) +logger = WorkerLogger(id=w.name) w.work() From 113477513c84d3bca08905140615a90ea333bd61 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 9 Dec 2024 17:36:37 +0100 Subject: [PATCH 12/18] chore: Add type hints --- policyengine_api/utils/logger.py | 21 +++++++++++--------- policyengine_api/utils/worker_logger.py | 26 ++++++++++++------------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 3ee78572..925fa5bd 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -9,10 +9,10 @@ class Logger: def __init__( self, - dir="logs", - name="api_main", - id=datetime.now().strftime("%Y%m%d_%H%M%S"), - log_to_cloud=True, + dir: str = "logs", + name: str = "api_main", + id: str = datetime.now().strftime("%Y%m%d_%H%M%S"), + log_to_cloud: bool = True, ): """ Initialize standard logger @@ -47,9 +47,7 @@ def __init__( # Create log file path based upon directory self.filepath = self.dir.joinpath(f"{self.name}_{self.id}.log") - self.memory_monitor = None - self.cloud_client = None - self.cloud_logger = None + self.cloud_client: cloud_logging.Client = None # Prevent duplicate handlers if not self.logger.handlers: @@ -92,9 +90,14 @@ def __init__( ) self.logger.addHandler(cloud_handler) - def log(self, message, level="info", **context): + def log(self, message: str, level: str = "info", **context): """ Log a message with optional context data + + Args: + message (str): Message to log + level (str): Log level (default: "info") + context (dict): Optional context data to log """ # Format message with context if provided @@ -105,6 +108,6 @@ def log(self, message, level="info", **context): log_func = getattr(self.logger, level.lower()) log_func(message) - def error(self, message, **context): + def error(self, message: str, **context): """Convenience method to log an error message""" self.log(message, level="error", **context) diff --git a/policyengine_api/utils/worker_logger.py b/policyengine_api/utils/worker_logger.py index 8e511108..d93e8124 100644 --- a/policyengine_api/utils/worker_logger.py +++ b/policyengine_api/utils/worker_logger.py @@ -17,13 +17,13 @@ class WorkerLogger(Logger): def __init__( self, - dir="logs", - name="worker", - id=None, - log_to_cloud=True, - monitor_memory=True, - memory_threshold=75, - memory_check_interval=5, + dir: str = "logs", + name: str = "worker", + id: str = None, + log_to_cloud: bool = True, + monitor_memory: bool = True, + memory_threshold: int = 75, + memory_check_interval: int = 5, ): """ Initialize logger with automatic worker ID detection if none provided @@ -54,7 +54,7 @@ def __init__( log_to_cloud=self.log_to_cloud, ) - self.memory_monitor = None + self.memory_monitor: MemoryMonitor = None if monitor_memory: self.memory_monitor = MemoryMonitor( logger=self, @@ -101,12 +101,12 @@ def __init__(self, threshold_percent=90, check_interval=5, logger=None): threshold_percent (int): Memory usage threshold to trigger warnings (default: 75%) check_interval (int): How often to check memory in seconds (default: 5) """ - self.threshold_percent = threshold_percent - self.check_interval = check_interval + self.threshold_percent: int = threshold_percent + self.check_interval: int = check_interval self.stop_flag = threading.Event() self.monitor_thread: Optional[threading.Thread] = None - self.logger = proxy(logger) - self._pid = os.getpid() + self.logger: Logger = proxy(logger) + self._pid: int = os.getpid() def start(self): """Start memory monitoring in a separate thread""" @@ -131,7 +131,7 @@ def _setup_signal_handlers(self): for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT): signal.signal(sig, self._handle_signal) - def _handle_signal(self, signum, frame): + def _handle_signal(self, signum: signal): """Signal handler to stop monitoring""" self.logger.log( f"Received signal {signum}, stopping memory monitor", From 3015bdbde0fe071e3ac32ea3318af98909772c6e Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 9 Dec 2024 18:03:53 +0100 Subject: [PATCH 13/18] chore: Update tests --- policyengine_api/utils/logger.py | 7 ++++-- tests/python/test_logging.py | 43 +++++++++++++++----------------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py index 925fa5bd..63f4fdc2 100644 --- a/policyengine_api/utils/logger.py +++ b/policyengine_api/utils/logger.py @@ -11,7 +11,7 @@ def __init__( self, dir: str = "logs", name: str = "api_main", - id: str = datetime.now().strftime("%Y%m%d_%H%M%S"), + id: str = None, log_to_cloud: bool = True, ): """ @@ -29,7 +29,10 @@ def __init__( # Generate three parts of storage path self.dir = Path(dir) self.name = name - self.id = id + # Don't pass datetime to initializer, otherwise time is equal to when class is created + self.id = ( + id if id is not None else datetime.now().strftime("%Y%m%d_%H%M%S") + ) self.logger = logging.getLogger(self.name) self.logger.setLevel(logging.INFO) diff --git a/tests/python/test_logging.py b/tests/python/test_logging.py index 1d41341f..c7622c4d 100644 --- a/tests/python/test_logging.py +++ b/tests/python/test_logging.py @@ -22,7 +22,7 @@ class TestLogger: def test_logger_initialization(self, temp_log_dir, mock_datetime): """Test basic logger initialization""" logger = Logger( - folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False ) # Check if logger was properly initialized @@ -31,17 +31,19 @@ def test_logger_initialization(self, temp_log_dir, mock_datetime): assert logger.logger.level == logging.INFO # Verify log directory creation - expected_log_dir = temp_log_dir / TEST_LOGGER_NAME + expected_log_dir = temp_log_dir assert expected_log_dir.exists() # Verify log file creation - expected_log_file = expected_log_dir / (TEST_LOGGER_ID + ".log") + expected_log_file = expected_log_dir / ( + TEST_LOGGER_NAME + "_" + TEST_LOGGER_ID + ".log" + ) assert expected_log_file.exists() def test_logger_handlers(self, temp_log_dir): """Test that appropriate handlers are added""" logger = Logger( - folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False ) # Should have exactly 2 handlers (file and console) when cloud logging is disabled @@ -57,7 +59,7 @@ def test_cloud_logging_initialization( ): """Test initialization with cloud logging enabled""" logger = Logger( - folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=True + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=True ) # Should have 3 handlers (file, console, and cloud) @@ -67,14 +69,16 @@ def test_cloud_logging_initialization( def test_log_message(self, temp_log_dir, mock_datetime): """Test logging a message""" logger = Logger( - folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False ) test_message = "Test log message" logger.log(test_message) # Read the log file and verify the message was logged - log_file = temp_log_dir / TEST_LOGGER_NAME / (TEST_LOGGER_ID + ".log") + log_file = temp_log_dir / ( + TEST_LOGGER_NAME + "_" + TEST_LOGGER_ID + ".log" + ) assert log_file.exists(), f"Log file not found at {log_file}" with open(log_file, "r") as f: log_content = f.read() @@ -83,7 +87,7 @@ def test_log_message(self, temp_log_dir, mock_datetime): def test_log_with_context(self, temp_log_dir, mock_datetime): """Test logging a message with context""" logger = Logger( - folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False ) test_message = "Test message" @@ -91,7 +95,9 @@ def test_log_with_context(self, temp_log_dir, mock_datetime): logger.log(test_message, **context) # Read the log file and verify the message and context were logged - log_file = temp_log_dir / TEST_LOGGER_NAME / (TEST_LOGGER_ID + ".log") + log_file = temp_log_dir / ( + TEST_LOGGER_NAME + "_" + TEST_LOGGER_ID + ".log" + ) assert log_file.exists(), f"Log file not found at {log_file}" with open(log_file, "r") as f: log_content = f.read() @@ -102,14 +108,16 @@ def test_log_with_context(self, temp_log_dir, mock_datetime): def test_error_logging(self, temp_log_dir, mock_datetime): """Test error logging functionality""" logger = Logger( - folder=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False ) error_message = "Test error message" logger.error(error_message) # Use the exact log file path based on the mocked datetime - log_file = temp_log_dir / TEST_LOGGER_NAME / (TEST_LOGGER_ID + ".log") + log_file = temp_log_dir / ( + TEST_LOGGER_NAME + "_" + TEST_LOGGER_ID + ".log" + ) assert log_file.exists(), f"Log file not found at {log_file}" with open(log_file, "r") as f: @@ -119,24 +127,13 @@ def test_error_logging(self, temp_log_dir, mock_datetime): ), "Error message not found in log content" assert "ERROR" in log_content, "ERROR level not found in log content" - def test_debug_mode_behavior(self, temp_log_dir): - """Test logger behavior in debug mode""" - with patch.dict(os.environ, {"FLASK_DEBUG": "1"}): - logger = Logger( - folder=str(temp_log_dir), - name=TEST_LOGGER_NAME, - log_to_cloud=False, - ) - # Logger should not be initialized in debug mode - assert not hasattr(logger, "logger") - def test_failed_directory_creation(self, temp_log_dir, mock_datetime): """Test fallback behavior when log directory creation fails""" with patch("pathlib.Path.mkdir") as mock_mkdir: mock_mkdir.side_effect = PermissionError("Permission denied") logger = Logger( - folder=str(temp_log_dir), + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False, ) From f3f0264e3cc2bec4350b747997426a62dfcb63e2 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 9 Dec 2024 18:33:50 +0100 Subject: [PATCH 14/18] fix: Migrate routes to context --- policyengine_api/routes/economy_routes.py | 15 +++++++++++++-- .../routes/tracer_analysis_routes.py | 17 +++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/policyengine_api/routes/economy_routes.py b/policyengine_api/routes/economy_routes.py index ea59cc5f..8c8bb146 100644 --- a/policyengine_api/routes/economy_routes.py +++ b/policyengine_api/routes/economy_routes.py @@ -17,7 +17,12 @@ @economy_bp.route("//over/", methods=["GET"]) def get_economic_impact(country_id, policy_id, baseline_policy_id): logger.log( - f"GET request received for get_economic_impact, in {country_id}, policy {policy_id} over {baseline_policy_id}" + f"GET request received for get_economic_impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + }, ) policy_id = int(policy_id or get_current_law_policy_id(country_id)) @@ -50,7 +55,13 @@ def get_economic_impact(country_id, policy_id, baseline_policy_id): return result except Exception as e: logger.error( - f"Error within get_economic_impact, country {country_id}, policy {policy_id} over {baseline_policy_id}; details: {str(e)}" + f"Error within get_economic_impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "error": str(e), + }, ) return Response( { diff --git a/policyengine_api/routes/tracer_analysis_routes.py b/policyengine_api/routes/tracer_analysis_routes.py index 758ed6eb..7df3f155 100644 --- a/policyengine_api/routes/tracer_analysis_routes.py +++ b/policyengine_api/routes/tracer_analysis_routes.py @@ -56,7 +56,13 @@ def execute_tracer_analysis(country_id): This exception is raised when the tracer can't find a household tracer record """ logger.log( - f"No household simulation tracer found in {country_id}, household {household_id}, policy {policy_id}, variable {variable}" + f"No household simulation tracer found", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + }, ) return Response( json.dumps( @@ -70,7 +76,14 @@ def execute_tracer_analysis(country_id): ) except Exception as e: logger.error( - f"Error while executing tracer analysis in {country_id}, household {household_id}, policy {policy_id}, variable {variable}; details: {str(e)}" + f"Error while executing tracer analysis", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + "error": str(e), + }, ) return Response( json.dumps( From 6c66cbc63e6872b5250144596f0a30932c8dffc5 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 9 Dec 2024 18:47:21 +0100 Subject: [PATCH 15/18] fix: Use context for all services --- policyengine_api/services/economy_service.py | 75 +++++++++++-- policyengine_api/services/job_service.py | 22 +++- policyengine_api/services/policy_service.py | 9 +- .../services/reform_impacts_service.py | 101 ++++++++++++++++-- .../services/simulation_analysis_service.py | 6 +- .../services/tracer_analysis_service.py | 65 +++++++++-- 6 files changed, 251 insertions(+), 27 deletions(-) diff --git a/policyengine_api/services/economy_service.py b/policyengine_api/services/economy_service.py index e91abfca..4b091d53 100644 --- a/policyengine_api/services/economy_service.py +++ b/policyengine_api/services/economy_service.py @@ -40,7 +40,17 @@ def get_economic_impact( "[" + "&".join([f"{k}={v}" for k, v in options.items()]) + "]" ) logger.log( - f"Checking if {policy_id} over {baseline_policy_id} in {country_id}, region {region}, dataset {dataset} already calculated" + f"Checking if economic impact already calculated", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "api_version": api_version, + }, ) # Create job ID @@ -87,7 +97,14 @@ def get_economic_impact( ) # Get baseline and reform policy - logger.log("Fetching baseline and reform policies") + logger.log( + "Fetching baseline and reform policies", + context={ + "country_id": country_id, + "baseline_policy_id": baseline_policy_id, + "policy_id": policy_id, + }, + ) baseline_policy = policy_service.get_policy_json( country_id, baseline_policy_id ) @@ -96,7 +113,17 @@ def get_economic_impact( ) # Enqueue job - logger.log("Enqueuing job") + logger.log( + "Enqueuing job", + context={ + "baseline_policy_id": baseline_policy_id, + "policy_id": policy_id, + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, + ) job_service.execute_job( type="calculate_economy_simulation", baseline_policy_id=baseline_policy_id, @@ -172,7 +199,20 @@ def get_economic_impact( ) except Exception as e: - logger.error(f"Error getting economic impact: {str(e)}") + logger.error( + f"Error getting economic impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "api_version": api_version, + "error": str(e), + }, + ) raise e def _get_previous_impacts( @@ -218,7 +258,17 @@ def _set_impact_computing( options_hash, api_version, ): - logger.log("Setting impact computing record") + logger.log( + "Setting impact computing record", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, + ) try: reform_impacts_service.set_reform_impact( country_id, @@ -235,5 +285,18 @@ def _set_impact_computing( datetime.datetime.now(), ) except Exception as e: - logger.error(f"Error inserting computing record: {str(e)}") + logger.error( + f"Error inserting computing record", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "api_version": api_version, + "error": str(e), + }, + ) raise e diff --git a/policyengine_api/services/job_service.py b/policyengine_api/services/job_service.py index 721e44ec..3b11bf8a 100644 --- a/policyengine_api/services/job_service.py +++ b/policyengine_api/services/job_service.py @@ -5,10 +5,12 @@ from policyengine_api.jobs import CalculateEconomySimulationJob from datetime import datetime from enum import Enum +from policyengine_api.utils.logger import Logger calc_ec_sim_job = CalculateEconomySimulationJob() queue = Queue(connection=Redis()) +logger = Logger() class JobStatus(Enum): @@ -37,7 +39,7 @@ def execute_job(self, job_id, job_timeout, type, *args, **kwargs): "finished", "failed", ]: - print( + logger.log( f"Job {job_id} already exists and is {existing_job.get_status()}" ) return @@ -59,7 +61,14 @@ def execute_job(self, job_id, job_timeout, type, *args, **kwargs): self._prune_recent_jobs() except Exception as e: - print(f"Error executing job: {str(e)}") + logger.error( + f"Error executing job", + context={ + "job_id": job_id, + "type": type, + "error": str(e), + }, + ) raise e def fetch_job_queue_pos(self, job_id): @@ -72,7 +81,13 @@ def fetch_job_queue_pos(self, job_id): ) return pos except Exception as e: - print(f"Error fetching job queue position: {str(e)}") + logger.error( + f"Error fetching job queue position", + context={ + "job_id": job_id, + "error": str(e), + }, + ) raise e def get_recent_jobs(self): @@ -103,7 +118,6 @@ def get_average_time(self): recent_jobs = sorted( recent_jobs, key=lambda x: x["end_time"], reverse=True )[:10] - print(recent_jobs, self.recent_jobs) if not recent_jobs: return 100 total_time = sum( diff --git a/policyengine_api/services/policy_service.py b/policyengine_api/services/policy_service.py index abdca68b..7cba8b0e 100644 --- a/policyengine_api/services/policy_service.py +++ b/policyengine_api/services/policy_service.py @@ -23,5 +23,12 @@ def get_policy_json(self, country_id, policy_id): ).fetchone()["policy_json"] return policy_json except Exception as e: - logger.error(f"Error getting policy json: {str(e)}") + logger.error( + f"Error getting policy json", + context={ + "country_id": country_id, + "policy_id": policy_id, + "error": str(e), + }, + ) raise e diff --git a/policyengine_api/services/reform_impacts_service.py b/policyengine_api/services/reform_impacts_service.py index 6b817c62..ed868b10 100644 --- a/policyengine_api/services/reform_impacts_service.py +++ b/policyengine_api/services/reform_impacts_service.py @@ -24,7 +24,15 @@ def get_all_reform_impacts( api_version, ): logger.log( - f"Getting all reform impacts for country {country_id}, policy {policy_id}, baseline {baseline_policy_id}, region {region}, dataset {dataset}" + f"Getting all reform impacts", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, ) try: query = ( @@ -47,7 +55,18 @@ def get_all_reform_impacts( ), ).fetchall() except Exception as e: - logger.error(f"Error getting all reform impacts: {str(e)}") + logger.error( + f"Error getting all reform impacts", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "error": str(e), + }, + ) raise e def set_reform_impact( @@ -66,7 +85,18 @@ def set_reform_impact( start_time, ): logger.log( - f"Setting reform impact record for country {country_id}, policy {policy_id}, baseline {baseline_policy_id}, region {region}, dataset {dataset}" + f"Setting reform impact record with status {status}", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "status": status, + "api_version": api_version, + }, ) try: query = ( @@ -92,7 +122,21 @@ def set_reform_impact( ), ) except Exception as e: - logger.error(f"Error setting reform impact: {str(e)}") + logger.error( + f"Error setting reform impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "status": status, + "api_version": api_version, + "error": str(e), + }, + ) raise e def delete_reform_impact( @@ -106,7 +150,15 @@ def delete_reform_impact( options_hash, ): logger.log( - f"Deleteing reform impact for country {country_id}, policy {policy_id}, baseline {baseline_policy_id}, region {region}, dataset {dataset}" + f"Deleting reform impact for policy {policy_id} over {baseline_policy_id}", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, ) try: @@ -130,7 +182,18 @@ def delete_reform_impact( ), ) except Exception as e: - logger.error(f"Error deleting reform impact: {str(e)}") + logger.error( + f"Error deleting reform impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "error": str(e), + }, + ) raise e def set_error_reform_impact( @@ -185,7 +248,17 @@ def set_complete_reform_impact( options_hash, reform_impact_json, ): - logger.log("Setting completed reform impact") + logger.log( + "Setting completed reform impact", + context={ + "country_id": country_id, + "reform_policy_id": reform_policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, + ) try: query = ( "UPDATE reform_impact SET status = ?, message = ?, end_time = ?, " @@ -213,5 +286,17 @@ def set_complete_reform_impact( ), ) except Exception as e: - logger.error(f"Error setting completed reform impact: {str(e)}") + logger.error( + f"Error setting completed reform impact", + context={ + "country_id": country_id, + "reform_policy_id": reform_policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options_hash": options_hash, + "error": str(e), + }, + ) raise e diff --git a/policyengine_api/services/simulation_analysis_service.py b/policyengine_api/services/simulation_analysis_service.py index 0f1aac4e..f3fa0391 100644 --- a/policyengine_api/services/simulation_analysis_service.py +++ b/policyengine_api/services/simulation_analysis_service.py @@ -70,7 +70,11 @@ def execute_analysis( return analysis except Exception as e: logger.error( - f"Error while triggering AI analysis; details: {str(e)}" + f"Error while triggering AI analysis", + context={ + "prompt": prompt, + "error": str(e), + }, ) raise e diff --git a/policyengine_api/services/tracer_analysis_service.py b/policyengine_api/services/tracer_analysis_service.py index 82c8f85b..20fd0c59 100644 --- a/policyengine_api/services/tracer_analysis_service.py +++ b/policyengine_api/services/tracer_analysis_service.py @@ -23,7 +23,13 @@ def execute_analysis( ): logger.log( - f"Generating tracer analysis for household {household_id}, policy {policy_id}, variable {variable} in {country_id}" + f"Generating tracer analysis", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + }, ) api_version = COUNTRY_PACKAGE_VERSIONS[country_id] @@ -37,7 +43,16 @@ def execute_analysis( api_version, ) except Exception as e: - logger.error(f"Error retrieving tracer record: {str(e)}") + logger.error( + f"Error retrieving tracer record", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + "error": str(e), + }, + ) raise e # Parse the tracer output for our given variable @@ -46,7 +61,16 @@ def execute_analysis( tracer, variable ) except Exception as e: - logger.error(f"Error parsing tracer output: {str(e)}") + logger.error( + f"Error parsing tracer output", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + "error": str(e), + }, + ) raise e # Add the parsed tracer output to the prompt @@ -66,7 +90,15 @@ def execute_analysis( return analysis except Exception as e: logger.error( - f"Error generating AI analysis within tracer analysis service: {str(e)}" + f"Error generating AI analysis within tracer analysis service", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + "prompt": prompt, + "error": str(e), + }, ) raise e @@ -77,7 +109,15 @@ def get_tracer( policy_id: str, api_version: str, ) -> list: - logger.log("Getting existing tracer analysis from tracers table") + logger.log( + f"Getting existing tracer analysis from tracers table", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "api_version": api_version, + }, + ) try: # Retrieve from the tracers table in the local database row = local_database.query( @@ -95,11 +135,22 @@ def get_tracer( return tracer_output_list except Exception as e: - logger.error(f"Error getting existing tracer analysis: {str(e)}") + logger.error( + f"Error getting existing tracer analysis", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "api_version": api_version, + "error": str(e), + }, + ) raise e def _parse_tracer_output(self, tracer_output, target_variable): - logger.log("Parsing tracer output for target variable") + logger.log( + f"Parsing tracer output for target variable {target_variable}" + ) result = [] target_indent = None From b0978e718a41d1b2f09fe65c78512d1a05025d9e Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 9 Dec 2024 19:21:27 +0100 Subject: [PATCH 16/18] feat: Add proper memory logging functions --- .../jobs/calculate_economy_simulation_job.py | 60 +++++++++++++++++-- policyengine_api/utils/worker_logger.py | 32 +++++++++- 2 files changed, 85 insertions(+), 7 deletions(-) diff --git a/policyengine_api/jobs/calculate_economy_simulation_job.py b/policyengine_api/jobs/calculate_economy_simulation_job.py index fa6d7035..9ea3ed53 100644 --- a/policyengine_api/jobs/calculate_economy_simulation_job.py +++ b/policyengine_api/jobs/calculate_economy_simulation_job.py @@ -22,6 +22,8 @@ reform_impacts_service = ReformImpactsService() +logger = WorkerLogger() + class CalculateEconomySimulationJob(BaseJob): def __init__(self): @@ -39,8 +41,18 @@ def run( baseline_policy: dict, reform_policy: dict, ): - logger = WorkerLogger() - logger.log(f"Starting CalculateEconomySimulationJob.run") + logger.log( + f"Starting CalculateEconomySimulationJob.run", + context={ + "baseline_policy_id": baseline_policy_id, + "policy_id": policy_id, + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + }, + ) try: # Configure inputs # Note for anyone modifying options_hash: redis-queue treats ":" as a namespace @@ -125,7 +137,16 @@ def run( comment = lambda x: set_comment_on_job(x, *identifiers) comment("Computing baseline") - logger.log("Computing baseline") + logger.log( + "Computing baseline", + context={ + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + }, + ) # Compute baseline economy baseline_economy = self._compute_economy( @@ -137,7 +158,16 @@ def run( policy_json=baseline_policy, ) comment("Computing reform") - logger.log("Computing reform") + logger.log( + "Computing reform", + context={ + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + }, + ) # Compute reform economy reform_economy = self._compute_economy( @@ -152,6 +182,16 @@ def run( baseline_economy = baseline_economy["result"] reform_economy = reform_economy["result"] comment("Comparing baseline and reform") + logger.log( + "Comparing baseline and reform", + context={ + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + }, + ) impact = compare_economic_outputs( baseline_economy, reform_economy, country_id=country_id ) @@ -179,7 +219,17 @@ def run( options_hash, message=traceback.format_exc(), ) - logger.error(f"Error setting reform impact: {str(e)}") + logger.error( + f"Error setting reform impact", + context={ + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "error": str(e), + }, + ) raise e def _compute_economy( diff --git a/policyengine_api/utils/worker_logger.py b/policyengine_api/utils/worker_logger.py index d93e8124..3485dd72 100644 --- a/policyengine_api/utils/worker_logger.py +++ b/policyengine_api/utils/worker_logger.py @@ -6,7 +6,6 @@ import psutil from typing import Optional import os -from weakref import proxy import signal @@ -61,6 +60,7 @@ def __init__( threshold_percent=memory_threshold, check_interval=memory_check_interval, ) + self.memory_monitor.start() print(f"Initialized worker logger with ID: {self.id}") @@ -91,6 +91,34 @@ def get_worker_id(): # Default to timestamp if no other ID found return datetime.now().strftime("%Y%m%d_%H%M%S") + def log_memory_stats( + self, process_memory_mb, process_percent, system_percent + ): + """Log memory statistics""" + self.log( + "Memory usage stats", + level="info", + metric_type="memory_usage", + process_memory_mb=round(process_memory_mb, 2), + process_percent=round(process_percent, 2), + system_percent=round(system_percent, 2), + ) + + def log_memory_warning(self, message, **context): + """Log memory warning""" + self.log( + message, level="warning", metric_type="memory_warning", **context + ) + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - ensure cleanup""" + if self.memory_monitor: + self.memory_monitor.stop() + class MemoryMonitor: def __init__(self, threshold_percent=90, check_interval=5, logger=None): @@ -105,7 +133,7 @@ def __init__(self, threshold_percent=90, check_interval=5, logger=None): self.check_interval: int = check_interval self.stop_flag = threading.Event() self.monitor_thread: Optional[threading.Thread] = None - self.logger: Logger = proxy(logger) + self.logger: Logger = logger self._pid: int = os.getpid() def start(self): From 47dba7e1d68d343f0c4e47f8fc6f1492c8096631 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 9 Dec 2024 19:41:47 +0100 Subject: [PATCH 17/18] fix: Remove memory logging --- policyengine_api/utils/worker_logger.py | 170 ------------------------ 1 file changed, 170 deletions(-) diff --git a/policyengine_api/utils/worker_logger.py b/policyengine_api/utils/worker_logger.py index 3485dd72..f232634c 100644 --- a/policyengine_api/utils/worker_logger.py +++ b/policyengine_api/utils/worker_logger.py @@ -1,12 +1,6 @@ from policyengine_api.utils.logger import Logger from rq import Worker, get_current_job from datetime import datetime -import time -import threading -import psutil -from typing import Optional -import os -import signal class WorkerLogger(Logger): @@ -20,9 +14,6 @@ def __init__( name: str = "worker", id: str = None, log_to_cloud: bool = True, - monitor_memory: bool = True, - memory_threshold: int = 75, - memory_check_interval: int = 5, ): """ Initialize logger with automatic worker ID detection if none provided @@ -53,15 +44,6 @@ def __init__( log_to_cloud=self.log_to_cloud, ) - self.memory_monitor: MemoryMonitor = None - if monitor_memory: - self.memory_monitor = MemoryMonitor( - logger=self, - threshold_percent=memory_threshold, - check_interval=memory_check_interval, - ) - self.memory_monitor.start() - print(f"Initialized worker logger with ID: {self.id}") @staticmethod @@ -90,155 +72,3 @@ def get_worker_id(): # Default to timestamp if no other ID found return datetime.now().strftime("%Y%m%d_%H%M%S") - - def log_memory_stats( - self, process_memory_mb, process_percent, system_percent - ): - """Log memory statistics""" - self.log( - "Memory usage stats", - level="info", - metric_type="memory_usage", - process_memory_mb=round(process_memory_mb, 2), - process_percent=round(process_percent, 2), - system_percent=round(system_percent, 2), - ) - - def log_memory_warning(self, message, **context): - """Log memory warning""" - self.log( - message, level="warning", metric_type="memory_warning", **context - ) - - def __enter__(self): - """Context manager entry""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit - ensure cleanup""" - if self.memory_monitor: - self.memory_monitor.stop() - - -class MemoryMonitor: - def __init__(self, threshold_percent=90, check_interval=5, logger=None): - """ - Initialize memory monitor - - Args: - threshold_percent (int): Memory usage threshold to trigger warnings (default: 75%) - check_interval (int): How often to check memory in seconds (default: 5) - """ - self.threshold_percent: int = threshold_percent - self.check_interval: int = check_interval - self.stop_flag = threading.Event() - self.monitor_thread: Optional[threading.Thread] = None - self.logger: Logger = logger - self._pid: int = os.getpid() - - def start(self): - """Start memory monitoring in a separate thread""" - self.stop_flag.clear() - self._pid = os.getpid() - - self.monitor_thread = threading.Thread(target=self._monitor_memory) - self.monitor_thread.daemon = True - self.monitor_thread.start() - - self._setup_signal_handlers() - - def stop(self): - """Stop memory monitoring""" - if self.monitor_thread and self.monitor_thread.is_alive(): - self.stop_flag.set() - self.monitor_thread.join(timeout=1.0) - - def _setup_signal_handlers(self): - """Setup signal handlers to stop monitoring""" - - for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT): - signal.signal(sig, self._handle_signal) - - def _handle_signal(self, signum: signal): - """Signal handler to stop monitoring""" - self.logger.log( - f"Received signal {signum}, stopping memory monitor", - level="critical", - ) - self.stop() - - def _monitor_memory(self): - """Memory monitoring loop""" - process = psutil.Process() - while not self.stop_flag.is_set(): - try: - - if os.getpid() != self._pid: - self.logger.log( - "Memory monitor detected PID mismatch, stopping", - level="warning", - ) - break - - try: - process = psutil.Process(self._pid) - except psutil.NoSuchProcess: - self.logger.log( - "Memory monitor detected missing process, stopping", - level="warning", - ) - break - - if not process.is_running(): - self.logger.log( - "Memory monitor detected process stopped, stopping", - level="warning", - ) - break - - try: - # Get memory info - memory_info = process.memory_info() - system_memory = psutil.virtual_memory() - except Exception as e: - self.logger.log( - f"Error getting memory info: {str(e)}", - level="error", - error_type=type(e).__name__, - ) - break - - # Calculate usage percentages - process_percent = (memory_info.rss / system_memory.total) * 100 - system_percent = system_memory.percent - - # Log memory stats - self.logger.log_memory_stats( - process_memory_mb=memory_info.rss / (1024 * 1024), - process_percent=process_percent, - system_percent=system_percent, - ) - - # Check for high memory usage - if system_percent > self.threshold_percent: - self.logger.log_memory_warning( - f"High system memory usage: {system_percent:.1f}%", - system_percent=system_percent, - ) - - if process_percent > ( - self.threshold_percent / 2 - ): # Process threshold at half of system - self.logger.log_memory_warning( - f"High process memory usage: {process_percent:.1f}%", - process_percent=process_percent, - ) - - except Exception as e: - self.logger.log( - f"Error monitoring memory: {str(e)}", - level="error", - error_type=type(e).__name__, - ) - - time.sleep(self.check_interval) From 5186b6f87870e34ce42cdd8e078316752a630ff9 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 9 Dec 2024 20:13:11 +0100 Subject: [PATCH 18/18] chore: Changelog --- changelog_entry.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29b..cde67109 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,5 @@ +- bump: minor + changes: + added: + - Basic logging + - Logging for worker runs \ No newline at end of file