Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging to API #2051

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ dist/*
**/*.h5
**/*.csv.gz
.env
logs/*
*.log

# Ignore generated credentials from google-github-actions/auth
gha-creds-*.json
Expand Down
5 changes: 5 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- bump: minor
changes:
added:
- Basic logging
- Logging for worker runs
9 changes: 6 additions & 3 deletions policyengine_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import flask
import yaml
from flask_caching import Cache
from policyengine_api.utils import make_cache_key
from policyengine_api.utils import make_cache_key, Logger
from .constants import VERSION

# from werkzeug.middleware.profiler import ProfilerMiddleware
Expand Down Expand Up @@ -38,7 +38,10 @@
get_simulations,
)

print("Initialising API...")
# Set up logging
logger = Logger()

logger.log("Initialising API...")

app = application = flask.Flask(__name__)

Expand Down Expand Up @@ -147,4 +150,4 @@ def get_specification():
return flask.jsonify(openapi_spec)


print("API initialised.")
logger.log("API initialised.")
72 changes: 63 additions & 9 deletions policyengine_api/jobs/calculate_economy_simulation_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
from policyengine_api.endpoints.economy.reform_impact import set_comment_on_job
from policyengine_api.constants import COUNTRY_PACKAGE_VERSIONS
from policyengine_api.country import COUNTRIES, create_policy_reform
from policyengine_api.utils import WorkerLogger
from policyengine_core.simulations import Microsimulation

from policyengine_us import Microsimulation
from policyengine_uk import Microsimulation

reform_impacts_service = ReformImpactsService()

logger = WorkerLogger()


class CalculateEconomySimulationJob(BaseJob):
def __init__(self):
Expand All @@ -38,7 +41,18 @@ def run(
baseline_policy: dict,
reform_policy: dict,
):
print(f"Starting CalculateEconomySimulationJob.run")
logger.log(
f"Starting CalculateEconomySimulationJob.run",
context={
"baseline_policy_id": baseline_policy_id,
"policy_id": policy_id,
"country_id": country_id,
"region": region,
"dataset": dataset,
"time_period": time_period,
"options": options,
},
)
try:
# Configure inputs
# Note for anyone modifying options_hash: redis-queue treats ":" as a namespace
Expand All @@ -61,7 +75,7 @@ def run(
COUNTRY_PACKAGE_VERSIONS[country_id],
)
if any(x["status"] == "ok" for x in existing):
print(f"Job already completed successfully")
logger.log(f"Job already completed successfully")
return

# Save identifiers for later commenting on processing status
Expand All @@ -75,7 +89,7 @@ def run(
options_hash,
)

print("Checking existing reform impacts...")
logger.log("Checking existing reform impacts...")
# Query existing impacts before deleting
existing = reform_impacts_service.get_all_reform_impacts(
country_id,
Expand All @@ -87,7 +101,7 @@ def run(
options_hash,
COUNTRY_PACKAGE_VERSIONS[country_id],
)
print(f"Found {len(existing)} existing impacts before delete")
logger.log(f"Found {len(existing)} existing impacts before delete")

# Delete any existing reform impact rows with the same identifiers
reform_impacts_service.delete_reform_impact(
Expand All @@ -100,7 +114,7 @@ def run(
options_hash,
)

print("Deleted existing computing impacts")
logger.log("Deleted existing computing impacts")

# Insert new reform impact row with status 'computing'
reform_impacts_service.set_reform_impact(
Expand All @@ -123,6 +137,16 @@ def run(

comment = lambda x: set_comment_on_job(x, *identifiers)
comment("Computing baseline")
logger.log(
"Computing baseline",
context={
"country_id": country_id,
"region": region,
"dataset": dataset,
"time_period": time_period,
"options": options,
},
)

# Compute baseline economy
baseline_economy = self._compute_economy(
Expand All @@ -134,6 +158,16 @@ def run(
policy_json=baseline_policy,
)
comment("Computing reform")
logger.log(
"Computing reform",
context={
"country_id": country_id,
"region": region,
"dataset": dataset,
"time_period": time_period,
"options": options,
},
)

# Compute reform economy
reform_economy = self._compute_economy(
Expand All @@ -148,6 +182,16 @@ def run(
baseline_economy = baseline_economy["result"]
reform_economy = reform_economy["result"]
comment("Comparing baseline and reform")
logger.log(
"Comparing baseline and reform",
context={
"country_id": country_id,
"region": region,
"dataset": dataset,
"time_period": time_period,
"options": options,
},
)
impact = compare_economic_outputs(
baseline_economy, reform_economy, country_id=country_id
)
Expand Down Expand Up @@ -175,7 +219,17 @@ def run(
options_hash,
message=traceback.format_exc(),
)
print(f"Error setting reform impact: {str(e)}")
logger.error(
f"Error setting reform impact",
context={
"country_id": country_id,
"region": region,
"dataset": dataset,
"time_period": time_period,
"options": options,
"error": str(e),
},
)
raise e

def _compute_economy(
Expand Down Expand Up @@ -222,7 +276,7 @@ def _compute_economy(
simulation.delete_arrays("person_weight", time_period)

if options.get("target") == "cliff":
print(f"Initialised cliff impact computation")
logger.log(f"Initialised cliff impact computation")
return {
"status": "ok",
"result": self._compute_cliff_impacts(simulation),
Expand Down Expand Up @@ -282,15 +336,15 @@ def _create_simulation_us(
# Second statement provides backwards compatibility option
# for running a simulation with the "enhanced_us" region
if dataset in DATASETS or region == "enhanced_us":
print(f"Running an enhanced CPS simulation")
logger.log(f"Running an enhanced CPS simulation")
from policyengine_us_data import EnhancedCPS_2024

sim_options["dataset"] = EnhancedCPS_2024

# Handle region settings; need to be mindful not to place
# legacy enhanced_us region in this block
if region not in ["us", "enhanced_us"]:
print(f"Filtering US dataset down to region {region}")
logger.log(f"Filtering US dataset down to region {region}")

from policyengine_us_data import Pooled_3_Year_CPS_2023

Expand Down
20 changes: 20 additions & 0 deletions policyengine_api/routes/economy_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,25 @@
from policyengine_api.constants import COUNTRY_PACKAGE_VERSIONS
from flask import request, Response
import json
from policyengine_api.utils import Logger

economy_bp = Blueprint("economy", __name__)
economy_service = EconomyService()

logger = Logger()


@validate_country
@economy_bp.route("/<policy_id>/over/<baseline_policy_id>", methods=["GET"])
def get_economic_impact(country_id, policy_id, baseline_policy_id):
logger.log(
f"GET request received for get_economic_impact",
context={
"country_id": country_id,
"policy_id": policy_id,
"baseline_policy_id": baseline_policy_id,
},
)

policy_id = int(policy_id or get_current_law_policy_id(country_id))
baseline_policy_id = int(
Expand Down Expand Up @@ -43,6 +54,15 @@ def get_economic_impact(country_id, policy_id, baseline_policy_id):
)
return result
except Exception as e:
logger.error(
f"Error within get_economic_impact",
context={
"country_id": country_id,
"policy_id": policy_id,
"baseline_policy_id": baseline_policy_id,
"error": str(e),
},
)
return Response(
{
"status": "error",
Expand Down
8 changes: 7 additions & 1 deletion policyengine_api/routes/simulation_analysis_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
validate_sim_analysis_payload,
validate_country,
)
from policyengine_api.utils.logger import Logger

simulation_analysis_bp = Blueprint("simulation_analysis", __name__)
simulation_analysis_service = SimulationAnalysisService()

logger = Logger()


@simulation_analysis_bp.route("", methods=["POST"])
@validate_country
def execute_simulation_analysis(country_id):
print("Got POST request for simulation analysis")
logger.log("Got POST request for simulation analysis")

# Pop items from request payload and validate
# where necessary
Expand Down Expand Up @@ -68,6 +71,9 @@ def execute_simulation_analysis(country_id):

return response
except Exception as e:
logger.error(
f"Error while executing simulation analysis; details: {str(e)}"
)
return Response(
json.dumps(
{
Expand Down
23 changes: 23 additions & 0 deletions policyengine_api/routes/tracer_analysis_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
validate_country,
validate_tracer_analysis_payload,
)
from policyengine_api.utils.logger import Logger
from policyengine_api.services.tracer_analysis_service import (
TracerAnalysisService,
)
Expand All @@ -11,10 +12,13 @@
tracer_analysis_bp = Blueprint("tracer_analysis", __name__)
tracer_analysis_service = TracerAnalysisService()

logger = Logger()


@tracer_analysis_bp.route("", methods=["POST"])
@validate_country
def execute_tracer_analysis(country_id):
logger.log("Got POST request for tracer analysis")

payload = request.json

Expand Down Expand Up @@ -51,6 +55,15 @@ def execute_tracer_analysis(country_id):
"""
This exception is raised when the tracer can't find a household tracer record
"""
logger.log(
f"No household simulation tracer found",
context={
"country_id": country_id,
"household_id": household_id,
"policy_id": policy_id,
"variable": variable,
},
)
return Response(
json.dumps(
{
Expand All @@ -62,6 +75,16 @@ def execute_tracer_analysis(country_id):
404,
)
except Exception as e:
logger.error(
f"Error while executing tracer analysis",
context={
"country_id": country_id,
"household_id": household_id,
"policy_id": policy_id,
"variable": variable,
"error": str(e),
},
)
return Response(
json.dumps(
{
Expand Down
8 changes: 8 additions & 0 deletions policyengine_api/services/ai_analysis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import time
from typing import Generator, Optional
from policyengine_api.data import local_database
from policyengine_api.utils.logger import Logger

logger = Logger()


class AIAnalysisService:
Expand All @@ -19,13 +22,15 @@ def get_existing_analysis(
"""
Get existing analysis from the local database
"""
logger.log(f"Getting existing analysis")

analysis = local_database.query(
f"SELECT analysis FROM analysis WHERE prompt = ?",
(prompt,),
).fetchone()

if analysis is None:
logger.log(f"No existing analysis found")
return None

def generate():
Expand All @@ -46,6 +51,7 @@ def generate():
return generate()

def trigger_ai_analysis(self, prompt: str) -> Generator[str, None, None]:
logger.log("Triggering AI analysis")

# Configure a Claude client
claude_client = anthropic.Anthropic(
Expand Down Expand Up @@ -83,6 +89,8 @@ def generate():
if buffer:
yield json.dumps({"stream": buffer}) + "\n"

logger.log("Updating analysis record")

# Finally, update the analysis record and return
local_database.query(
f"INSERT INTO analysis (prompt, analysis, status) VALUES (?, ?, ?)",
Expand Down
Loading
Loading