diff --git a/Makefile b/Makefile index 02f303f7..0526f1ab 100644 --- a/Makefile +++ b/Makefile @@ -7,9 +7,6 @@ debug: test: MAX_HOUSEHOLDS=1000 pytest tests -microdata: - python policyengine_api/download_microdata.py - debug-test: MAX_HOUSEHOLDS=1000 FLASK_DEBUG=1 pytest -vv --durations=0 tests diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29b..2a3f9388 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,6 @@ +- bump: minor + changes: + removed: + - Code associated with chunking + - Code associated with separating baseline and reform calculations + - Old todo comments \ No newline at end of file diff --git a/gcp/policyengine_api/Dockerfile b/gcp/policyengine_api/Dockerfile index dbd52462..94fa5754 100644 --- a/gcp/policyengine_api/Dockerfile +++ b/gcp/policyengine_api/Dockerfile @@ -14,8 +14,7 @@ ADD . /app # Make start.sh executable RUN chmod +x /app/start.sh -RUN cd /app && make install && make microdata -RUN cd /app && make test +RUN cd /app && make install && make test # Use full path to start.sh CMD ["/bin/sh", "/app/start.sh"] diff --git a/gcp/policyengine_api/app.yaml b/gcp/policyengine_api/app.yaml index de88005e..7ab84b7d 100644 --- a/gcp/policyengine_api/app.yaml +++ b/gcp/policyengine_api/app.yaml @@ -1,9 +1,9 @@ runtime: custom env: flex resources: - cpu: 16 - memory_gb: 32 - disk_size_gb: 64 + cpu: 24 + memory_gb: 128 + disk_size_gb: 128 automatic_scaling: min_num_instances: 1 max_num_instances: 1 diff --git a/gcp/policyengine_api/start.sh b/gcp/policyengine_api/start.sh index ed995dc3..2df7b8a7 100644 --- a/gcp/policyengine_api/start.sh +++ b/gcp/policyengine_api/start.sh @@ -1,10 +1,28 @@ +#!/bin/bash + +# Environment variables +export PORT=${PORT:-8080} +export WORKER_COUNT=${WORKER_COUNT:-3} +export REDIS_PORT=${REDIS_PORT:-6379} + # Start the API gunicorn -b :$PORT policyengine_api.api --timeout 300 --workers 5 & -# Start the redis server -redis-server & -# Start the worker -python3 policyengine_api/worker.py & -python3 policyengine_api/worker.py & -python3 policyengine_api/worker.py & -python3 policyengine_api/worker.py & -python3 policyengine_api/worker.py + +# Start Redis with configuration for multiple clients +redis-server --protected-mode no \ + --maxclients 10000 \ + --timeout 0 & + +# Wait for Redis to be ready +sleep 2 + +# Start multiple workers +for (( i=1; i<=$WORKER_COUNT; i++ )) +do + echo "Starting worker $i..." + python3 policyengine_api/worker.py & +done + +# Keep the script running and handle shutdown gracefully +trap "pkill -P $$; exit 1" SIGINT SIGTERM +wait \ No newline at end of file diff --git a/policyengine_api/country.py b/policyengine_api/country.py index eef09745..10ddca6e 100644 --- a/policyengine_api/country.py +++ b/policyengine_api/country.py @@ -304,12 +304,6 @@ def build_entities(self) -> dict: data[entity.key] = entity_data return data - # 1. Remove the call to `get_all_variables` (Done) - # 2. Remove the code to check if the calculated variable is within the traced variables array (Done) - # 3. Remove the commented code block that writes to the local_database inside the for loop (Done) - # 4. Delete the code at the end of the function that writes to a file (Done) - # 5. Add code at the end of the function to write to a database (Done) - def calculate( self, household: dict, diff --git a/policyengine_api/download_microdata.py b/policyengine_api/download_microdata.py deleted file mode 100644 index 820bc3b1..00000000 --- a/policyengine_api/download_microdata.py +++ /dev/null @@ -1,15 +0,0 @@ -from policyengine_us_data import EnhancedCPS_2024, CPS_2024 -from policyengine_uk_data import EnhancedFRS_2022_23 - -DATASETS = [EnhancedCPS_2024, CPS_2024, EnhancedFRS_2022_23] - - -def download_microdata(): - for dataset in DATASETS: - dataset = dataset() - if not dataset.exists: - dataset.download() - - -if __name__ == "__main__": - download_microdata() diff --git a/policyengine_api/endpoints/economy/chunks.py b/policyengine_api/endpoints/economy/chunks.py deleted file mode 100644 index 29583720..00000000 --- a/policyengine_api/endpoints/economy/chunks.py +++ /dev/null @@ -1,51 +0,0 @@ -import time -from tqdm import tqdm -import numpy as np - - -def calc_chunks(variables=None, count_chunks=5, logger=None, sim=None): - for i in range(len(variables)): - if isinstance(variables[i], str): - variables[i] = (variables[i], sim.default_calculation_period) - variables = [ - (variable, time_period) - for variable, time_period in variables - if variable in sim.tax_benefit_system.variables - ] - if count_chunks > 1: - households = sim.calculate("household_id", 2024).values - chunk_size = len(households) // count_chunks + 1 - input_df = sim.to_input_dataframe() - - variable_data = { - variable: np.array([]) for variable, time_period in variables - } - - for i in tqdm(range(count_chunks)): - if logger is not None: - pct_complete = i / count_chunks - logger(pct_complete) - households_in_chunk = households[ - i * chunk_size : (i + 1) * chunk_size - ] - chunk_df = input_df[ - input_df["household_id__2024"].isin(households_in_chunk) - ] - - subset_sim = type(sim)(dataset=chunk_df, reform=sim.reform) - subset_sim.default_calculation_period = ( - sim.default_calculation_period - ) - - for variable, time_period in variables: - chunk_values = subset_sim.calculate( - variable, time_period - ).values - variable_data[variable] = np.concatenate( - [variable_data[variable], chunk_values] - ) - - for variable, time_period in variables: - sim.set_input(variable, time_period, variable_data[variable]) - - return sim diff --git a/policyengine_api/endpoints/economy/reform_impact.py b/policyengine_api/endpoints/economy/reform_impact.py index bcd9f9cb..31788f78 100644 --- a/policyengine_api/endpoints/economy/reform_impact.py +++ b/policyengine_api/endpoints/economy/reform_impact.py @@ -172,44 +172,25 @@ def set_reform_impact_data_routine( ), ) comment = lambda x: set_comment_on_job(x, *identifiers) + comment("Computing baseline") - baseline_economy = queue.enqueue( - compute_economy, - country_id=country_id, - policy_id=baseline_policy_id, + baseline_economy = compute_economy( + country_id, + baseline_policy_id, region=region, time_period=time_period, options=options, policy_json=baseline_policy, ) - time.sleep(3) - reform_economy = queue.enqueue( - compute_economy, - country_id=country_id, - policy_id=policy_id, + comment("Computing reform") + reform_economy = compute_economy( + country_id, + policy_id, region=region, time_period=time_period, options=options, policy_json=reform_policy, ) - while baseline_economy.get_status() in ("queued", "started"): - time.sleep(1) - while reform_economy.get_status() in ("queued", "started"): - time.sleep(1) - if reform_economy.get_status() != "finished": - reform_economy = { - "status": "error", - "message": "Error computing reform economy.", - } - else: - reform_economy = reform_economy.result - if baseline_economy.get_status() != "finished": - baseline_economy = { - "status": "error", - "message": "Error computing baseline economy.", - } - else: - baseline_economy = baseline_economy.result if baseline_economy["status"] != "ok" or reform_economy["status"] != "ok": local_database.query( "UPDATE reform_impact SET status = ?, message = ?, end_time = ?, reform_impact_json = ? WHERE country_id = ? AND reform_policy_id = ? AND baseline_policy_id = ? AND region = ? AND time_period = ? AND options_hash = ?", diff --git a/policyengine_api/endpoints/economy/single_economy.py b/policyengine_api/endpoints/economy/single_economy.py index e3dd3743..b72ac002 100644 --- a/policyengine_api/endpoints/economy/single_economy.py +++ b/policyengine_api/endpoints/economy/single_economy.py @@ -7,42 +7,13 @@ from policyengine_uk import Microsimulation import time import os -from policyengine_api.endpoints.economy.chunks import calc_chunks import traceback def compute_general_economy( simulation: Microsimulation, country_id: str = None, - simulation_type: str = None, - comment=None, ) -> dict: - variables = [ - "labor_supply_behavioral_response", - "employment_income_behavioral_response", - "household_tax", - "household_benefits", - "household_state_income_tax", - "weekly_hours_worked_behavioural_response_income_elasticity", - "weekly_hours_worked_behavioural_response_substitution_elasticity", - "household_net_income", - "household_market_income", - "in_poverty", - "in_deep_poverty", - "poverty_gap", - "deep_poverty_gap", - "income_tax", - "national_insurance", - "vat", - "council_tax", - "fuel_duty", - "tax_credits", - "universal_credit", - "child_benefit", - "state_pension", - "pension_credit", - "ni_employer", - ] total_tax = simulation.calculate("household_tax").sum() total_spending = simulation.calculate("household_benefits").sum() @@ -258,7 +229,7 @@ def compute_cliff_impact( } -def get_microsimulation( +def compute_economy( country_id: str, policy_id: str, region: str, @@ -343,41 +314,14 @@ def get_microsimulation( "person_weight" ).get_known_periods(): simulation.delete_arrays("person_weight", time_period) - print(f"Initialised simulation in {time.time() - start} seconds") - - return simulation - -def compute_economy( - country_id: str, - policy_id: str, - region: str, - time_period: str, - options: dict, - policy_json: dict, - simulation_type: str = None, - comment=None, -): - try: - simulation = get_microsimulation( - country_id, - policy_id, - region, - time_period, - options, - policy_json, - ) if options.get("target") == "cliff": return compute_cliff_impact(simulation) + print(f"Intialised simulation in {time.time() - start} seconds") start = time.time() economy = compute_general_economy( simulation, country_id=country_id, - simulation_type=simulation_type, - comment=comment, ) - except Exception as e: - print(f"Error in economy computation: {traceback.format_exc()}") - return {"status": "error", "message": str(e)} print(f"Computed economy in {time.time() - start} seconds") return {"status": "ok", "result": economy} diff --git a/tests/python/conftest.py b/tests/python/conftest.py index 17350df1..a7c052b2 100644 --- a/tests/python/conftest.py +++ b/tests/python/conftest.py @@ -7,9 +7,6 @@ import redis import pytest from policyengine_api.api import app -from policyengine_api.download_microdata import download_microdata - -download_microdata() @contextmanager @@ -34,10 +31,6 @@ def client(): with running(["redis-server"], 3): redis_client = redis.Redis() redis_client.ping() - with running( - [sys.executable, "policyengine_api/worker.py"], 3 - ), running([sys.executable, "policyengine_api/worker.py"], 3), running( - [sys.executable, "policyengine_api/worker.py"], 3 - ): + with running([sys.executable, "policyengine_api/worker.py"], 3): with app.test_client() as test_client: yield test_client