diff --git a/.gitignore b/.gitignore index 91af8c3c..c3a5c3d6 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,8 @@ dist/* **/*.h5 **/*.csv.gz .env +logs/* +*.log # Ignore generated credentials from google-github-actions/auth gha-creds-*.json diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29b..cde67109 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,5 @@ +- bump: minor + changes: + added: + - Basic logging + - Logging for worker runs \ No newline at end of file diff --git a/policyengine_api/api.py b/policyengine_api/api.py index 3f3bd725..6759975c 100644 --- a/policyengine_api/api.py +++ b/policyengine_api/api.py @@ -7,7 +7,7 @@ import flask import yaml from flask_caching import Cache -from policyengine_api.utils import make_cache_key +from policyengine_api.utils import make_cache_key, Logger from .constants import VERSION # from werkzeug.middleware.profiler import ProfilerMiddleware @@ -38,7 +38,10 @@ get_simulations, ) -print("Initialising API...") +# Set up logging +logger = Logger() + +logger.log("Initialising API...") app = application = flask.Flask(__name__) @@ -147,4 +150,4 @@ def get_specification(): return flask.jsonify(openapi_spec) -print("API initialised.") +logger.log("API initialised.") diff --git a/policyengine_api/jobs/calculate_economy_simulation_job.py b/policyengine_api/jobs/calculate_economy_simulation_job.py index c8536876..9ea3ed53 100644 --- a/policyengine_api/jobs/calculate_economy_simulation_job.py +++ b/policyengine_api/jobs/calculate_economy_simulation_job.py @@ -14,6 +14,7 @@ from policyengine_api.endpoints.economy.reform_impact import set_comment_on_job from policyengine_api.constants import COUNTRY_PACKAGE_VERSIONS from policyengine_api.country import COUNTRIES, create_policy_reform +from policyengine_api.utils import WorkerLogger from policyengine_core.simulations import Microsimulation from policyengine_us import Microsimulation @@ -21,6 +22,8 @@ reform_impacts_service = ReformImpactsService() +logger = WorkerLogger() + class CalculateEconomySimulationJob(BaseJob): def __init__(self): @@ -38,7 +41,18 @@ def run( baseline_policy: dict, reform_policy: dict, ): - print(f"Starting CalculateEconomySimulationJob.run") + logger.log( + f"Starting CalculateEconomySimulationJob.run", + context={ + "baseline_policy_id": baseline_policy_id, + "policy_id": policy_id, + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + }, + ) try: # Configure inputs # Note for anyone modifying options_hash: redis-queue treats ":" as a namespace @@ -61,7 +75,7 @@ def run( COUNTRY_PACKAGE_VERSIONS[country_id], ) if any(x["status"] == "ok" for x in existing): - print(f"Job already completed successfully") + logger.log(f"Job already completed successfully") return # Save identifiers for later commenting on processing status @@ -75,7 +89,7 @@ def run( options_hash, ) - print("Checking existing reform impacts...") + logger.log("Checking existing reform impacts...") # Query existing impacts before deleting existing = reform_impacts_service.get_all_reform_impacts( country_id, @@ -87,7 +101,7 @@ def run( options_hash, COUNTRY_PACKAGE_VERSIONS[country_id], ) - print(f"Found {len(existing)} existing impacts before delete") + logger.log(f"Found {len(existing)} existing impacts before delete") # Delete any existing reform impact rows with the same identifiers reform_impacts_service.delete_reform_impact( @@ -100,7 +114,7 @@ def run( options_hash, ) - print("Deleted existing computing impacts") + logger.log("Deleted existing computing impacts") # Insert new reform impact row with status 'computing' reform_impacts_service.set_reform_impact( @@ -123,6 +137,16 @@ def run( comment = lambda x: set_comment_on_job(x, *identifiers) comment("Computing baseline") + logger.log( + "Computing baseline", + context={ + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + }, + ) # Compute baseline economy baseline_economy = self._compute_economy( @@ -134,6 +158,16 @@ def run( policy_json=baseline_policy, ) comment("Computing reform") + logger.log( + "Computing reform", + context={ + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + }, + ) # Compute reform economy reform_economy = self._compute_economy( @@ -148,6 +182,16 @@ def run( baseline_economy = baseline_economy["result"] reform_economy = reform_economy["result"] comment("Comparing baseline and reform") + logger.log( + "Comparing baseline and reform", + context={ + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + }, + ) impact = compare_economic_outputs( baseline_economy, reform_economy, country_id=country_id ) @@ -175,7 +219,17 @@ def run( options_hash, message=traceback.format_exc(), ) - print(f"Error setting reform impact: {str(e)}") + logger.error( + f"Error setting reform impact", + context={ + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "error": str(e), + }, + ) raise e def _compute_economy( @@ -222,7 +276,7 @@ def _compute_economy( simulation.delete_arrays("person_weight", time_period) if options.get("target") == "cliff": - print(f"Initialised cliff impact computation") + logger.log(f"Initialised cliff impact computation") return { "status": "ok", "result": self._compute_cliff_impacts(simulation), @@ -282,7 +336,7 @@ def _create_simulation_us( # Second statement provides backwards compatibility option # for running a simulation with the "enhanced_us" region if dataset in DATASETS or region == "enhanced_us": - print(f"Running an enhanced CPS simulation") + logger.log(f"Running an enhanced CPS simulation") from policyengine_us_data import EnhancedCPS_2024 sim_options["dataset"] = EnhancedCPS_2024 @@ -290,7 +344,7 @@ def _create_simulation_us( # Handle region settings; need to be mindful not to place # legacy enhanced_us region in this block if region not in ["us", "enhanced_us"]: - print(f"Filtering US dataset down to region {region}") + logger.log(f"Filtering US dataset down to region {region}") from policyengine_us_data import Pooled_3_Year_CPS_2023 diff --git a/policyengine_api/routes/economy_routes.py b/policyengine_api/routes/economy_routes.py index a1e922b4..8c8bb146 100644 --- a/policyengine_api/routes/economy_routes.py +++ b/policyengine_api/routes/economy_routes.py @@ -5,14 +5,25 @@ from policyengine_api.constants import COUNTRY_PACKAGE_VERSIONS from flask import request, Response import json +from policyengine_api.utils import Logger economy_bp = Blueprint("economy", __name__) economy_service = EconomyService() +logger = Logger() + @validate_country @economy_bp.route("//over/", methods=["GET"]) def get_economic_impact(country_id, policy_id, baseline_policy_id): + logger.log( + f"GET request received for get_economic_impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + }, + ) policy_id = int(policy_id or get_current_law_policy_id(country_id)) baseline_policy_id = int( @@ -43,6 +54,15 @@ def get_economic_impact(country_id, policy_id, baseline_policy_id): ) return result except Exception as e: + logger.error( + f"Error within get_economic_impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "error": str(e), + }, + ) return Response( { "status": "error", diff --git a/policyengine_api/routes/simulation_analysis_routes.py b/policyengine_api/routes/simulation_analysis_routes.py index 5326989a..38d006a9 100644 --- a/policyengine_api/routes/simulation_analysis_routes.py +++ b/policyengine_api/routes/simulation_analysis_routes.py @@ -8,15 +8,18 @@ validate_sim_analysis_payload, validate_country, ) +from policyengine_api.utils.logger import Logger simulation_analysis_bp = Blueprint("simulation_analysis", __name__) simulation_analysis_service = SimulationAnalysisService() +logger = Logger() + @simulation_analysis_bp.route("", methods=["POST"]) @validate_country def execute_simulation_analysis(country_id): - print("Got POST request for simulation analysis") + logger.log("Got POST request for simulation analysis") # Pop items from request payload and validate # where necessary @@ -68,6 +71,9 @@ def execute_simulation_analysis(country_id): return response except Exception as e: + logger.error( + f"Error while executing simulation analysis; details: {str(e)}" + ) return Response( json.dumps( { diff --git a/policyengine_api/routes/tracer_analysis_routes.py b/policyengine_api/routes/tracer_analysis_routes.py index d15b0dd4..7df3f155 100644 --- a/policyengine_api/routes/tracer_analysis_routes.py +++ b/policyengine_api/routes/tracer_analysis_routes.py @@ -3,6 +3,7 @@ validate_country, validate_tracer_analysis_payload, ) +from policyengine_api.utils.logger import Logger from policyengine_api.services.tracer_analysis_service import ( TracerAnalysisService, ) @@ -11,10 +12,13 @@ tracer_analysis_bp = Blueprint("tracer_analysis", __name__) tracer_analysis_service = TracerAnalysisService() +logger = Logger() + @tracer_analysis_bp.route("", methods=["POST"]) @validate_country def execute_tracer_analysis(country_id): + logger.log("Got POST request for tracer analysis") payload = request.json @@ -51,6 +55,15 @@ def execute_tracer_analysis(country_id): """ This exception is raised when the tracer can't find a household tracer record """ + logger.log( + f"No household simulation tracer found", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + }, + ) return Response( json.dumps( { @@ -62,6 +75,16 @@ def execute_tracer_analysis(country_id): 404, ) except Exception as e: + logger.error( + f"Error while executing tracer analysis", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + "error": str(e), + }, + ) return Response( json.dumps( { diff --git a/policyengine_api/services/ai_analysis_service.py b/policyengine_api/services/ai_analysis_service.py index 9e6556a0..0a859491 100644 --- a/policyengine_api/services/ai_analysis_service.py +++ b/policyengine_api/services/ai_analysis_service.py @@ -4,6 +4,9 @@ import time from typing import Generator, Optional from policyengine_api.data import local_database +from policyengine_api.utils.logger import Logger + +logger = Logger() class AIAnalysisService: @@ -19,6 +22,7 @@ def get_existing_analysis( """ Get existing analysis from the local database """ + logger.log(f"Getting existing analysis") analysis = local_database.query( f"SELECT analysis FROM analysis WHERE prompt = ?", @@ -26,6 +30,7 @@ def get_existing_analysis( ).fetchone() if analysis is None: + logger.log(f"No existing analysis found") return None def generate(): @@ -46,6 +51,7 @@ def generate(): return generate() def trigger_ai_analysis(self, prompt: str) -> Generator[str, None, None]: + logger.log("Triggering AI analysis") # Configure a Claude client claude_client = anthropic.Anthropic( @@ -83,6 +89,8 @@ def generate(): if buffer: yield json.dumps({"stream": buffer}) + "\n" + logger.log("Updating analysis record") + # Finally, update the analysis record and return local_database.query( f"INSERT INTO analysis (prompt, analysis, status) VALUES (?, ?, ?)", diff --git a/policyengine_api/services/economy_service.py b/policyengine_api/services/economy_service.py index 66beaceb..4b091d53 100644 --- a/policyengine_api/services/economy_service.py +++ b/policyengine_api/services/economy_service.py @@ -3,6 +3,7 @@ from policyengine_api.services.reform_impacts_service import ( ReformImpactsService, ) +from policyengine_api.utils.logger import Logger from policyengine_api.data import local_database, database import json import datetime @@ -11,6 +12,8 @@ job_service = JobService() reform_impacts_service = ReformImpactsService() +logger = Logger() + class EconomyService: """ @@ -36,13 +39,25 @@ def get_economic_impact( options_hash = ( "[" + "&".join([f"{k}={v}" for k, v in options.items()]) + "]" ) - print("Checking if already calculated") + logger.log( + f"Checking if economic impact already calculated", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "api_version": api_version, + }, + ) # Create job ID job_id = f"reform_impact_{country_id}_{policy_id}_{baseline_policy_id}_{region}_{dataset}_{time_period}_{options_hash}_{api_version}" # First, check if already calculated - print("Checking previous impacts...") + logger.log("Checking previous impacts...") previous_impacts = self._get_previous_impacts( country_id, policy_id, @@ -53,14 +68,14 @@ def get_economic_impact( options_hash, api_version, ) - print(f"Found {len(previous_impacts)} previous impacts") - print( + logger.log(f"Found {len(previous_impacts)} previous impacts") + logger.log( f"Previous impacts status: {[imp.get('status') for imp in previous_impacts]}" ) if len(previous_impacts) == 0: # Add job to recent job list - print("No previous impacts found, creating new job...") + logger.log("No previous impacts found, creating new job...") job_service.add_recent_job( job_id=job_id, type="calculate_economy_simulation", @@ -82,7 +97,14 @@ def get_economic_impact( ) # Get baseline and reform policy - print("Fetching baseline and reform policies") + logger.log( + "Fetching baseline and reform policies", + context={ + "country_id": country_id, + "baseline_policy_id": baseline_policy_id, + "policy_id": policy_id, + }, + ) baseline_policy = policy_service.get_policy_json( country_id, baseline_policy_id ) @@ -91,7 +113,17 @@ def get_economic_impact( ) # Enqueue job - print("Enqueuing job") + logger.log( + "Enqueuing job", + context={ + "baseline_policy_id": baseline_policy_id, + "policy_id": policy_id, + "country_id": country_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, + ) job_service.execute_job( type="calculate_economy_simulation", baseline_policy_id=baseline_policy_id, @@ -117,7 +149,7 @@ def get_economic_impact( 200, ) else: - print( + logger.log( f"Found previous impacts, first status: {previous_impacts[0]['status']}" ) ok_results = [ @@ -167,7 +199,20 @@ def get_economic_impact( ) except Exception as e: - print(f"Error getting economic impact: {str(e)}") + logger.error( + f"Error getting economic impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "api_version": api_version, + "error": str(e), + }, + ) raise e def _get_previous_impacts( @@ -213,6 +258,17 @@ def _set_impact_computing( options_hash, api_version, ): + logger.log( + "Setting impact computing record", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, + ) try: reform_impacts_service.set_reform_impact( country_id, @@ -229,5 +285,18 @@ def _set_impact_computing( datetime.datetime.now(), ) except Exception as e: - print(f"Error inserting computing record: {str(e)}") + logger.error( + f"Error inserting computing record", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "api_version": api_version, + "error": str(e), + }, + ) raise e diff --git a/policyengine_api/services/job_service.py b/policyengine_api/services/job_service.py index 721e44ec..3b11bf8a 100644 --- a/policyengine_api/services/job_service.py +++ b/policyengine_api/services/job_service.py @@ -5,10 +5,12 @@ from policyengine_api.jobs import CalculateEconomySimulationJob from datetime import datetime from enum import Enum +from policyengine_api.utils.logger import Logger calc_ec_sim_job = CalculateEconomySimulationJob() queue = Queue(connection=Redis()) +logger = Logger() class JobStatus(Enum): @@ -37,7 +39,7 @@ def execute_job(self, job_id, job_timeout, type, *args, **kwargs): "finished", "failed", ]: - print( + logger.log( f"Job {job_id} already exists and is {existing_job.get_status()}" ) return @@ -59,7 +61,14 @@ def execute_job(self, job_id, job_timeout, type, *args, **kwargs): self._prune_recent_jobs() except Exception as e: - print(f"Error executing job: {str(e)}") + logger.error( + f"Error executing job", + context={ + "job_id": job_id, + "type": type, + "error": str(e), + }, + ) raise e def fetch_job_queue_pos(self, job_id): @@ -72,7 +81,13 @@ def fetch_job_queue_pos(self, job_id): ) return pos except Exception as e: - print(f"Error fetching job queue position: {str(e)}") + logger.error( + f"Error fetching job queue position", + context={ + "job_id": job_id, + "error": str(e), + }, + ) raise e def get_recent_jobs(self): @@ -103,7 +118,6 @@ def get_average_time(self): recent_jobs = sorted( recent_jobs, key=lambda x: x["end_time"], reverse=True )[:10] - print(recent_jobs, self.recent_jobs) if not recent_jobs: return 100 total_time = sum( diff --git a/policyengine_api/services/policy_service.py b/policyengine_api/services/policy_service.py index 0a820f83..7cba8b0e 100644 --- a/policyengine_api/services/policy_service.py +++ b/policyengine_api/services/policy_service.py @@ -1,4 +1,7 @@ from policyengine_api.data import database +from policyengine_api.utils.logger import Logger + +logger = Logger() class PolicyService: @@ -9,6 +12,10 @@ class PolicyService: """ def get_policy_json(self, country_id, policy_id): + logger.log( + f"Getting policy json for country {country_id}, policy {policy_id}" + ) + try: policy_json = database.query( f"SELECT policy_json FROM policy WHERE country_id = ? AND id = ?", @@ -16,5 +23,12 @@ def get_policy_json(self, country_id, policy_id): ).fetchone()["policy_json"] return policy_json except Exception as e: - print(f"Error getting policy json: {str(e)}") + logger.error( + f"Error getting policy json", + context={ + "country_id": country_id, + "policy_id": policy_id, + "error": str(e), + }, + ) raise e diff --git a/policyengine_api/services/reform_impacts_service.py b/policyengine_api/services/reform_impacts_service.py index c6bb2d06..ed868b10 100644 --- a/policyengine_api/services/reform_impacts_service.py +++ b/policyengine_api/services/reform_impacts_service.py @@ -1,6 +1,9 @@ from policyengine_api.data import local_database +from policyengine_api.utils.logger import Logger import datetime +logger = Logger() + class ReformImpactsService: """ @@ -20,6 +23,17 @@ def get_all_reform_impacts( options_hash, api_version, ): + logger.log( + f"Getting all reform impacts", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, + ) try: query = ( "SELECT reform_impact_json, status, message, start_time FROM " @@ -41,7 +55,18 @@ def get_all_reform_impacts( ), ).fetchall() except Exception as e: - print(f"Error getting all reform impacts: {str(e)}") + logger.error( + f"Error getting all reform impacts", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "error": str(e), + }, + ) raise e def set_reform_impact( @@ -59,6 +84,20 @@ def set_reform_impact( reform_impact_json, start_time, ): + logger.log( + f"Setting reform impact record with status {status}", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "status": status, + "api_version": api_version, + }, + ) try: query = ( "INSERT INTO reform_impact (country_id, reform_policy_id, baseline_policy_id, " @@ -83,7 +122,21 @@ def set_reform_impact( ), ) except Exception as e: - print(f"Error setting reform impact: {str(e)}") + logger.error( + f"Error setting reform impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options": options, + "status": status, + "api_version": api_version, + "error": str(e), + }, + ) raise e def delete_reform_impact( @@ -96,6 +149,18 @@ def delete_reform_impact( time_period, options_hash, ): + logger.log( + f"Deleting reform impact for policy {policy_id} over {baseline_policy_id}", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, + ) + try: query = ( "DELETE FROM reform_impact WHERE country_id = ? AND " @@ -117,7 +182,18 @@ def delete_reform_impact( ), ) except Exception as e: - print(f"Error deleting reform impact: {str(e)}") + logger.error( + f"Error deleting reform impact", + context={ + "country_id": country_id, + "policy_id": policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "error": str(e), + }, + ) raise e def set_error_reform_impact( @@ -156,7 +232,7 @@ def set_error_reform_impact( ), ) except Exception as e: - print( + logger.error( f"Error setting error reform impact (something must be REALLY wrong): {str(e)}" ) raise e @@ -172,6 +248,17 @@ def set_complete_reform_impact( options_hash, reform_impact_json, ): + logger.log( + "Setting completed reform impact", + context={ + "country_id": country_id, + "reform_policy_id": reform_policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + }, + ) try: query = ( "UPDATE reform_impact SET status = ?, message = ?, end_time = ?, " @@ -199,5 +286,17 @@ def set_complete_reform_impact( ), ) except Exception as e: - print(f"Error setting completed reform impact: {str(e)}") + logger.error( + f"Error setting completed reform impact", + context={ + "country_id": country_id, + "reform_policy_id": reform_policy_id, + "baseline_policy_id": baseline_policy_id, + "region": region, + "dataset": dataset, + "time_period": time_period, + "options_hash": options_hash, + "error": str(e), + }, + ) raise e diff --git a/policyengine_api/services/simulation_analysis_service.py b/policyengine_api/services/simulation_analysis_service.py index 2e5c31b2..f3fa0391 100644 --- a/policyengine_api/services/simulation_analysis_service.py +++ b/policyengine_api/services/simulation_analysis_service.py @@ -1,6 +1,9 @@ import json from policyengine_api.services.ai_analysis_service import AIAnalysisService +from policyengine_api.utils.logger import Logger + +logger = Logger() class SimulationAnalysisService(AIAnalysisService): @@ -31,7 +34,7 @@ def execute_analysis( # Check if the region is enhanced_cps is_enhanced_cps = "enhanced_us" in region - print("Generating prompt for economy-wide simulation analysis") + logger.log(f"Generating prompt for economy-wide simulation analysis") # Create prompt based on data prompt = self._generate_simulation_analysis_prompt( @@ -51,14 +54,14 @@ def execute_analysis( # Add audience description to end prompt += self.audience_descriptions[audience] - print("Checking if AI analysis already exists for this prompt") + logger.log("Checking if AI analysis already exists for this prompt") # If a calculated record exists for this prompt, return it as a # streaming response existing_analysis = self.get_existing_analysis(prompt) if existing_analysis is not None: return existing_analysis - print( + logger.log( "Found no existing AI analysis; triggering new analysis with Claude" ) # Otherwise, pass prompt to Claude, then return streaming function @@ -66,6 +69,13 @@ def execute_analysis( analysis = self.trigger_ai_analysis(prompt) return analysis except Exception as e: + logger.error( + f"Error while triggering AI analysis", + context={ + "prompt": prompt, + "error": str(e), + }, + ) raise e def _generate_simulation_analysis_prompt( diff --git a/policyengine_api/services/tracer_analysis_service.py b/policyengine_api/services/tracer_analysis_service.py index 162245d2..20fd0c59 100644 --- a/policyengine_api/services/tracer_analysis_service.py +++ b/policyengine_api/services/tracer_analysis_service.py @@ -5,6 +5,9 @@ import re import anthropic from policyengine_api.services.ai_analysis_service import AIAnalysisService +from policyengine_api.utils.logger import Logger + +logger = Logger() class TracerAnalysisService(AIAnalysisService): @@ -19,6 +22,16 @@ def execute_analysis( variable: str, ): + logger.log( + f"Generating tracer analysis", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + }, + ) + api_version = COUNTRY_PACKAGE_VERSIONS[country_id] # Retrieve tracer record from table @@ -30,6 +43,16 @@ def execute_analysis( api_version, ) except Exception as e: + logger.error( + f"Error retrieving tracer record", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + "error": str(e), + }, + ) raise e # Parse the tracer output for our given variable @@ -38,7 +61,16 @@ def execute_analysis( tracer, variable ) except Exception as e: - print(f"Error parsing tracer output: {str(e)}") + logger.error( + f"Error parsing tracer output", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + "error": str(e), + }, + ) raise e # Add the parsed tracer output to the prompt @@ -57,8 +89,16 @@ def execute_analysis( analysis: Generator = self.trigger_ai_analysis(prompt) return analysis except Exception as e: - print( - f"Error generating AI analysis within tracer analysis service: {str(e)}" + logger.error( + f"Error generating AI analysis within tracer analysis service", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "variable": variable, + "prompt": prompt, + "error": str(e), + }, ) raise e @@ -69,6 +109,15 @@ def get_tracer( policy_id: str, api_version: str, ) -> list: + logger.log( + f"Getting existing tracer analysis from tracers table", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "api_version": api_version, + }, + ) try: # Retrieve from the tracers table in the local database row = local_database.query( @@ -86,10 +135,23 @@ def get_tracer( return tracer_output_list except Exception as e: - print(f"Error getting existing tracer analysis: {str(e)}") + logger.error( + f"Error getting existing tracer analysis", + context={ + "country_id": country_id, + "household_id": household_id, + "policy_id": policy_id, + "api_version": api_version, + "error": str(e), + }, + ) raise e def _parse_tracer_output(self, tracer_output, target_variable): + logger.log( + f"Parsing tracer output for target variable {target_variable}" + ) + result = [] target_indent = None capturing = False diff --git a/policyengine_api/utils/__init__.py b/policyengine_api/utils/__init__.py index 07a267c6..381f2792 100644 --- a/policyengine_api/utils/__init__.py +++ b/policyengine_api/utils/__init__.py @@ -2,3 +2,5 @@ from .cache_utils import * from .singleton import Singleton from .get_current_law import get_current_law_policy_id +from .logger import Logger +from .worker_logger import WorkerLogger diff --git a/policyengine_api/utils/logger.py b/policyengine_api/utils/logger.py new file mode 100644 index 00000000..63f4fdc2 --- /dev/null +++ b/policyengine_api/utils/logger.py @@ -0,0 +1,116 @@ +import logging +import sys +from google.cloud import logging as cloud_logging +from datetime import datetime +from pathlib import Path +import os + + +class Logger: + def __init__( + self, + dir: str = "logs", + name: str = "api_main", + id: str = None, + log_to_cloud: bool = True, + ): + """ + Initialize standard logger + + Filepath: + dir/name_id.log + + Args: + dir (str): Directory to store log files (defaults to "logs") + name (str): Name of the logger (defaults to "api_main") + id (str): ID to append to log file name; if not provided, will use current timestamp + log_to_cloud (bool): Whether to log to Google Cloud Logging + """ + # Generate three parts of storage path + self.dir = Path(dir) + self.name = name + # Don't pass datetime to initializer, otherwise time is equal to when class is created + self.id = ( + id if id is not None else datetime.now().strftime("%Y%m%d_%H%M%S") + ) + + self.logger = logging.getLogger(self.name) + self.logger.setLevel(logging.INFO) + + # Create log directory if it doesn't exist + try: + self.dir.mkdir(parents=True, exist_ok=True) + except Exception as e: + print( + f"Warning: Could not create log directory {self.dir}: {str(e)}" + ) + # Fall back to current directory + self.dir = Path(".") + + # Create log file path based upon directory + self.filepath = self.dir.joinpath(f"{self.name}_{self.id}.log") + + self.cloud_client: cloud_logging.Client = None + + # Prevent duplicate handlers + if not self.logger.handlers: + # File handler - logs to local file + file_handler = logging.FileHandler(str(self.filepath)) + file_handler.setLevel(logging.INFO) + file_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ) + self.logger.addHandler(file_handler) + + # Console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(logging.INFO) + console_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ) + self.logger.addHandler(console_handler) + + # Google Cloud Logging handler; don't log to GCP if in debug + if log_to_cloud and os.environ.get("FLASK_DEBUG") != "1": + try: + cloud_client = cloud_logging.Client() + except Exception as e: + print(f"Google Cloud Logging error: {str(e)}") + return + cloud_handler = cloud_logging.handlers.CloudLoggingHandler( + cloud_client, name=f"{self.name}" + ) + cloud_handler = cloud_logging.handlers.CloudLoggingHandler( + cloud_client + ) + cloud_handler.setLevel(logging.INFO) + cloud_handler.setFormatter( + logging.Formatter("%(levelname)s - %(message)s") + ) + self.logger.addHandler(cloud_handler) + + def log(self, message: str, level: str = "info", **context): + """ + Log a message with optional context data + + Args: + message (str): Message to log + level (str): Log level (default: "info") + context (dict): Optional context data to log + """ + + # Format message with context if provided + if context: + context_str = " ".join(f"{k}={v}" for k, v in context.items()) + message = f"{message} | {context_str}" + + log_func = getattr(self.logger, level.lower()) + log_func(message) + + def error(self, message: str, **context): + """Convenience method to log an error message""" + self.log(message, level="error", **context) diff --git a/policyengine_api/utils/worker_logger.py b/policyengine_api/utils/worker_logger.py new file mode 100644 index 00000000..f232634c --- /dev/null +++ b/policyengine_api/utils/worker_logger.py @@ -0,0 +1,74 @@ +from policyengine_api.utils.logger import Logger +from rq import Worker, get_current_job +from datetime import datetime + + +class WorkerLogger(Logger): + """ + Custom logger for worker processes + """ + + def __init__( + self, + dir: str = "logs", + name: str = "worker", + id: str = None, + log_to_cloud: bool = True, + ): + """ + Initialize logger with automatic worker ID detection if none provided + + All args optional + Args: + dir (str): Directory to store log files (defaults to "logs") + name (str): Name of the logger (defaults to "worker") + id (str): ID to append to log file name; if not provided, will fetch worker name + log_to_cloud (bool): Whether to log to Google Cloud Logging (defaults to True) + monitor_memory (bool): Whether to monitor memory usage (defaults to True) + memory_threshold (int): Memory usage threshold to trigger warnings (default: 75%) + memory_check_interval (int): How often to check memory in seconds (default: 5) + """ + self.dir = dir + self.name = name + self.log_to_cloud = log_to_cloud + + if id is not None: + self.id = id + else: + self.id = self.get_worker_id() + + super().__init__( + dir=self.dir, + name=self.name, + id=self.id, + log_to_cloud=self.log_to_cloud, + ) + + print(f"Initialized worker logger with ID: {self.id}") + + @staticmethod + def get_worker_id(): + """ + Attempts to get the worker ID through various methods: + 1. From current RQ job + 2. From environment variable + 3. From RQ worker name + 4. Generates a default if none found + """ + # Try to get from current job context + current_job = get_current_job() + if current_job and current_job.worker_name: + return current_job.worker_name + + # Try to get from current worker + try: + worker = Worker.find_by_key( + Worker.worker_key_prefix + current_job.worker_name + ) + if worker: + return worker.name + except: + pass + + # Default to timestamp if no other ID found + return datetime.now().strftime("%Y%m%d_%H%M%S") diff --git a/policyengine_api/worker.py b/policyengine_api/worker.py index 8bbc9841..8c6269a7 100644 --- a/policyengine_api/worker.py +++ b/policyengine_api/worker.py @@ -1,6 +1,8 @@ from redis import Redis from rq import Worker +from policyengine_api.utils.worker_logger import WorkerLogger + # Preload libraries import policyengine_uk import policyengine_us @@ -9,4 +11,5 @@ # Provide the worker with the list of queues (str) to listen to. w = Worker(["default"], connection=Redis()) +logger = WorkerLogger(id=w.name) w.work() diff --git a/tests/fixtures/logging_fixtures.py b/tests/fixtures/logging_fixtures.py new file mode 100644 index 00000000..6f9a092d --- /dev/null +++ b/tests/fixtures/logging_fixtures.py @@ -0,0 +1,55 @@ +import pytest +import os +from unittest.mock import patch, Mock +import logging +from datetime import datetime + +TEST_LOGGER_NAME = "test_logger" +TEST_LOGGER_ID = "20240101_120000" + + +@pytest.fixture(autouse=True) +def mock_flask_debug(): + """ + Fixture to set FLASK_DEBUG=0 for all tests; this is + necessary because the logger has special behavior in debug + """ + with patch.dict(os.environ, {"FLASK_DEBUG": "0"}): + yield + + +@pytest.fixture(autouse=True) +def clear_handlers(): + """Fixture to clear handlers before each test""" + logging.getLogger(TEST_LOGGER_NAME).handlers.clear() + yield + logging.getLogger(TEST_LOGGER_NAME).handlers.clear() + + +@pytest.fixture +def temp_log_dir(tmp_path): + """Fixture to create a temporary log directory""" + log_dir = tmp_path / "logs" + log_dir.mkdir(exist_ok=True) # Create the directory + return log_dir + + +@pytest.fixture +def mock_datetime(): + """Fixture to mock datetime for consistent id""" + with patch("policyengine_api.utils.logger.datetime") as mock_dt: + mock_dt.now.return_value = datetime(2024, 1, 1, 12, 0, 0) + yield mock_dt + + +@pytest.fixture +def mock_cloud_logging(): + """Fixture to mock Google Cloud Logging""" + with patch( + "policyengine_api.utils.logger.cloud_logging.Client" + ) as mock_client: + mock_handler = Mock() + mock_client.return_value.get_default_handler.return_value = ( + mock_handler + ) + yield mock_client diff --git a/tests/python/test_logging.py b/tests/python/test_logging.py new file mode 100644 index 00000000..c7622c4d --- /dev/null +++ b/tests/python/test_logging.py @@ -0,0 +1,142 @@ +import pytest +import logging +from pathlib import Path +import os +from unittest.mock import Mock, patch, MagicMock +from datetime import datetime +from google.cloud import logging as cloud_logging +from policyengine_api.utils.logger import Logger + +from tests.fixtures.logging_fixtures import ( + mock_flask_debug, + clear_handlers, + temp_log_dir, + mock_datetime, + mock_cloud_logging, + TEST_LOGGER_NAME, + TEST_LOGGER_ID, +) + + +class TestLogger: + def test_logger_initialization(self, temp_log_dir, mock_datetime): + """Test basic logger initialization""" + logger = Logger( + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + # Check if logger was properly initialized + assert logger.name == TEST_LOGGER_NAME + assert logger.id == TEST_LOGGER_ID + assert logger.logger.level == logging.INFO + + # Verify log directory creation + expected_log_dir = temp_log_dir + assert expected_log_dir.exists() + + # Verify log file creation + expected_log_file = expected_log_dir / ( + TEST_LOGGER_NAME + "_" + TEST_LOGGER_ID + ".log" + ) + assert expected_log_file.exists() + + def test_logger_handlers(self, temp_log_dir): + """Test that appropriate handlers are added""" + logger = Logger( + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + # Should have exactly 2 handlers (file and console) when cloud logging is disabled + assert len(logger.logger.handlers) == 2 + + # Verify handler types + handlers = logger.logger.handlers + assert any(isinstance(h, logging.FileHandler) for h in handlers) + assert any(isinstance(h, logging.StreamHandler) for h in handlers) + + def test_cloud_logging_initialization( + self, temp_log_dir, mock_cloud_logging + ): + """Test initialization with cloud logging enabled""" + logger = Logger( + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=True + ) + + # Should have 3 handlers (file, console, and cloud) + assert len(logger.logger.handlers) == 3 + mock_cloud_logging.assert_called_once() + + def test_log_message(self, temp_log_dir, mock_datetime): + """Test logging a message""" + logger = Logger( + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + test_message = "Test log message" + logger.log(test_message) + + # Read the log file and verify the message was logged + log_file = temp_log_dir / ( + TEST_LOGGER_NAME + "_" + TEST_LOGGER_ID + ".log" + ) + assert log_file.exists(), f"Log file not found at {log_file}" + with open(log_file, "r") as f: + log_content = f.read() + assert test_message in log_content + + def test_log_with_context(self, temp_log_dir, mock_datetime): + """Test logging a message with context""" + logger = Logger( + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + test_message = "Test message" + context = {"user": "test_user", "action": "login"} + logger.log(test_message, **context) + + # Read the log file and verify the message and context were logged + log_file = temp_log_dir / ( + TEST_LOGGER_NAME + "_" + TEST_LOGGER_ID + ".log" + ) + assert log_file.exists(), f"Log file not found at {log_file}" + with open(log_file, "r") as f: + log_content = f.read() + assert test_message in log_content + assert "user=test_user" in log_content + assert "action=login" in log_content + + def test_error_logging(self, temp_log_dir, mock_datetime): + """Test error logging functionality""" + logger = Logger( + dir=str(temp_log_dir), name=TEST_LOGGER_NAME, log_to_cloud=False + ) + + error_message = "Test error message" + logger.error(error_message) + + # Use the exact log file path based on the mocked datetime + log_file = temp_log_dir / ( + TEST_LOGGER_NAME + "_" + TEST_LOGGER_ID + ".log" + ) + assert log_file.exists(), f"Log file not found at {log_file}" + + with open(log_file, "r") as f: + log_content = f.read() + assert ( + error_message in log_content + ), "Error message not found in log content" + assert "ERROR" in log_content, "ERROR level not found in log content" + + def test_failed_directory_creation(self, temp_log_dir, mock_datetime): + """Test fallback behavior when log directory creation fails""" + with patch("pathlib.Path.mkdir") as mock_mkdir: + mock_mkdir.side_effect = PermissionError("Permission denied") + + logger = Logger( + dir=str(temp_log_dir), + name=TEST_LOGGER_NAME, + log_to_cloud=False, + ) + + # Should fallback to current directory + assert logger.dir == Path(".")