Skip to content

Commit

Permalink
Add ACS, rent and property taxes and 3-year CPS (#35)
Browse files Browse the repository at this point in the history
* Migrate ACS from policyengine-us
Fixes #31

* populate acs

* Update PolicyEngine US data

* format

* data fix

* test

* changelog

* Update PolicyEngine US data

* remove extra

* chagelog

* Update PolicyEngine US data

* readme file

* property tax

* changelog

* Update PolicyEngine US data

* format

* changelog

* Pool 3 CPS years
Fixes #66

* Upload ECPS result in PRs

* Feed into ECPS

* Bump version and ECPS file

* changelog

* Move back to old ECPS

* init

* storage

* Fix imports

* Move versioning back

* Add URL for ACS 2022

* Add QRF rewrite and full imputations

* Add calibration

* Shift to branch of US

* Make optional install

* Generate ACS before CPS

* What a silly error

* Minor improvements

* Fix bugs

* Adjust QRF to enable single-output predictions

* Fix bug in QRF

---------

Co-authored-by: Github Actions[bot] <[email protected]>
Co-authored-by: Nikhil Woodruff <[email protected]>
  • Loading branch information
3 people authored Sep 23, 2024
1 parent 659fac0 commit 4e1d1e0
Show file tree
Hide file tree
Showing 20 changed files with 634 additions and 66 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/pull_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ jobs:
POLICYENGINE_US_DATA_GITHUB_TOKEN: ${{ secrets.POLICYENGINE_US_DATA_GITHUB_TOKEN }}
- name: Build datasets
run: make data
env:
TEST_LITE: true
- name: Run tests
run: pytest
- name: Test documentation builds
run: make documentation
run: make documentation
- name: Upload ECPS 2024
uses: actions/upload-artifact@v4
with:
name: enhanced_cps_2024.h5
path: policyengine_us_data/storage/enhanced_cps_2024.h5
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ documentation:
jb clean docs && jb build docs

data:
python policyengine_us_data/datasets/acs/acs.py
python policyengine_us_data/datasets/cps/cps.py
python policyengine_us_data/datasets/cps/extended_cps.py
python policyengine_us_data/datasets/cps/enhanced_cps.py

clean:
Expand Down
6 changes: 6 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- bump: minor
changes:
added:
- Migrate the ACS from the US-repository.
changed:
- Enhanced CPS now uses a 3-year pooled CPS.
4 changes: 3 additions & 1 deletion policyengine_us_data/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
CPS_2022,
CPS_2023,
CPS_2024,
Pooled_3_Year_CPS_2023,
CensusCPS_2018,
CensusCPS_2019,
CensusCPS_2020,
Expand All @@ -15,5 +16,6 @@
ReweightedCPS_2024,
)
from .puf import PUF_2015, PUF_2021, PUF_2024, IRS_PUF_2015
from .acs import ACS_2022

DATASETS = [CPS_2022, PUF_2021, CPS_2024, EnhancedCPS_2024]
DATASETS = [CPS_2022, PUF_2021, CPS_2024, EnhancedCPS_2024, ACS_2022]
6 changes: 6 additions & 0 deletions policyengine_us_data/datasets/acs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
2022 ACS 1 Year Data Dictionary:
https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2022.pdf
User Guide:
https://www2.census.gov/programs-surveys/acs/tech_docs/pums/2022ACS_PUMS_User_Guide.pdf
PUMS Documentation:
https://www.census.gov/programs-surveys/acs/microdata/documentation.html
2 changes: 2 additions & 0 deletions policyengine_us_data/datasets/acs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .acs import *
from .census_acs import *
118 changes: 118 additions & 0 deletions policyengine_us_data/datasets/acs/acs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging
from policyengine_core.data import Dataset
import h5py
from policyengine_us_data.datasets.acs.census_acs import CensusACS_2022
from policyengine_us_data.storage import STORAGE_FOLDER
from pandas import DataFrame
import numpy as np
import pandas as pd


class ACS(Dataset):
data_format = Dataset.ARRAYS
time_period = None
census_acs = None

def generate(self) -> None:
"""Generates the ACS dataset."""

raw_data = self.census_acs(require=True).load()
acs = h5py.File(self.file_path, mode="w")
person, household = [
raw_data[entity] for entity in ("person", "household")
]

self.add_id_variables(acs, person, household)
self.add_person_variables(acs, person, household)
self.add_household_variables(acs, household)

acs.close()
raw_data.close()

@staticmethod
def add_id_variables(
acs: h5py.File,
person: DataFrame,
household: DataFrame,
) -> None:
# Create numeric IDs based on SERIALNO
h_id_to_number = pd.Series(
np.arange(len(household)), index=household["SERIALNO"]
)
household["household_id"] = h_id_to_number[
household["SERIALNO"]
].values
person["household_id"] = h_id_to_number[person["SERIALNO"]].values
person["person_id"] = person.index + 1

acs["person_id"] = person["person_id"]
acs["household_id"] = household["household_id"]
acs["spm_unit_id"] = acs["household_id"]
acs["tax_unit_id"] = acs["household_id"]
acs["family_id"] = acs["household_id"]
acs["marital_unit_id"] = acs["household_id"]
acs["person_household_id"] = person["household_id"]
acs["person_spm_unit_id"] = person["household_id"]
acs["person_tax_unit_id"] = person["household_id"]
acs["person_family_id"] = person["household_id"]
acs["person_marital_unit_id"] = person["household_id"]
acs["household_weight"] = household.WGTP

@staticmethod
def add_person_variables(
acs: h5py.File, person: DataFrame, household: DataFrame
) -> None:
acs["age"] = person.AGEP
acs["is_male"] = person.SEX == 1
acs["employment_income"] = person.WAGP
acs["self_employment_income"] = person.SEMP
acs["social_security"] = person.SSP
acs["taxable_private_pension_income"] = person.RETP
person[["rent", "real_estate_taxes"]] = (
household.set_index("household_id")
.loc[person["household_id"]][["RNTP", "TAXAMT"]]
.values
)
acs["is_household_head"] = person.SPORDER == 1
factor = person.SPORDER == 1
person.rent *= factor * 12
person.real_estate_taxes *= factor
acs["rent"] = person.rent
acs["real_estate_taxes"] = person.real_estate_taxes
acs["tenure_type"] = (
household.TEN.astype(int)
.map(
{
1: "OWNED_WITH_MORTGAGE",
2: "OWNED_OUTRIGHT",
3: "RENTED",
}
)
.fillna("NONE")
.astype("S")
)

@staticmethod
def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None:
acs["spm_unit_net_income_reported"] = spm_unit.SPM_RESOURCES
acs["spm_unit_spm_threshold"] = spm_unit.SPM_POVTHRESHOLD

@staticmethod
def add_household_variables(acs: h5py.File, household: DataFrame) -> None:
acs["household_vehicles_owned"] = household.VEH
acs["state_fips"] = acs["household_state_fips"] = household.ST.astype(
int
)


class ACS_2022(ACS):
name = "acs_2022"
label = "ACS 2022"
time_period = 2022
file_path = STORAGE_FOLDER / "acs_2022.h5"
census_acs = CensusACS_2022
url = "release://PolicyEngine/policyengine-us-data/release/acs_2022.h5"


if __name__ == "__main__":
ACS_2022().generate()
208 changes: 208 additions & 0 deletions policyengine_us_data/datasets/acs/census_acs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
from io import BytesIO
import logging
from typing import List
from zipfile import ZipFile
import pandas as pd
from policyengine_core.data import Dataset
import requests
from tqdm import tqdm
from policyengine_us_data.storage import STORAGE_FOLDER

logging.getLogger().setLevel(logging.INFO)

PERSON_COLUMNS = [
"SERIALNO", # Household ID
"SPORDER", # Person number within household
"PWGTP", # Person weight
"AGEP", # Age
"CIT", # Citizenship
"MAR", # Marital status
"WAGP", # Wage/salary
"SSP", # Social security income
"SSIP", # Supplemental security income
"SEX", # Sex
"SEMP", # Self-employment income
"SCHL", # Educational attainment
"RETP", # Retirement income
"PAP", # Public assistance income
"OIP", # Other income
"PERNP", # Total earnings
"PINCP", # Total income
"POVPIP", # Income-to-poverty line percentage
"RAC1P", # Race
]

HOUSEHOLD_COLUMNS = [
"SERIALNO", # Household ID
"PUMA", # PUMA area code
"ST", # State code
"ADJHSG", # Adjustment factor for housing dollar amounts
"ADJINC", # Adjustment factor for income
"WGTP", # Household weight
"NP", # Number of persons in household
"BDSP", # Number of bedrooms
"ELEP", # Electricity monthly cost
"FULP", # Fuel monthly cost
"GASP", # Gas monthly cost
"RMSP", # Number of rooms
"RNTP", # Monthly rent
"TEN", # Tenure
"VEH", # Number of vehicles
"FINCP", # Total income
"GRNTP", # Gross rent
"TAXAMT", # Property taxes
]


class CensusACS(Dataset):
data_format = Dataset.TABLES

def generate(self) -> None:
spm_url = f"https://www2.census.gov/programs-surveys/supplemental-poverty-measure/datasets/spm/spm_{self.time_period}_pu.dta"
person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_pus.zip"
household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_hus.zip"

with pd.HDFStore(self.file_path, mode="w") as storage:
household = self.process_household_data(
household_url, "psam_hus", HOUSEHOLD_COLUMNS
)
person = self.process_person_data(
person_url, "psam_pus", PERSON_COLUMNS
)
person = person[person.SERIALNO.isin(household.SERIALNO)]
household = household[household.SERIALNO.isin(person.SERIALNO)]
storage["household"] = household
storage["person"] = person

@staticmethod
def process_household_data(
url: str, prefix: str, columns: List[str]
) -> pd.DataFrame:
req = requests.get(url, stream=True)
with BytesIO() as f:
pbar = tqdm()
for chunk in req.iter_content(chunk_size=1024):
if chunk:
pbar.update(len(chunk))
f.write(chunk)
f.seek(0)
zf = ZipFile(f)
a = pd.read_csv(
zf.open(prefix + "a.csv"),
usecols=columns,
dtype={"SERIALNO": str},
)
b = pd.read_csv(
zf.open(prefix + "b.csv"),
usecols=columns,
dtype={"SERIALNO": str},
)
res = pd.concat([a, b]).fillna(0)
res.columns = res.columns.str.upper()

# Ensure correct data types
res["ST"] = res["ST"].astype(int)

return res

@staticmethod
def process_person_data(
url: str, prefix: str, columns: List[str]
) -> pd.DataFrame:
req = requests.get(url, stream=True)
with BytesIO() as f:
pbar = tqdm()
for chunk in req.iter_content(chunk_size=1024):
if chunk:
pbar.update(len(chunk))
f.write(chunk)
f.seek(0)
zf = ZipFile(f)
a = pd.read_csv(
zf.open(prefix + "a.csv"),
usecols=columns,
dtype={"SERIALNO": str},
)
b = pd.read_csv(
zf.open(prefix + "b.csv"),
usecols=columns,
dtype={"SERIALNO": str},
)
res = pd.concat([a, b]).fillna(0)
res.columns = res.columns.str.upper()

# Ensure correct data types
res["SPORDER"] = res["SPORDER"].astype(int)

return res

@staticmethod
def create_spm_unit_table(
storage: pd.HDFStore, person: pd.DataFrame
) -> None:
SPM_UNIT_COLUMNS = [
"CAPHOUSESUB",
"CAPWKCCXPNS",
"CHILDCAREXPNS",
"EITC",
"ENGVAL",
"EQUIVSCALE",
"FEDTAX",
"FEDTAXBC",
"FICA",
"GEOADJ",
"MEDXPNS",
"NUMADULTS",
"NUMKIDS",
"NUMPER",
"POOR",
"POVTHRESHOLD",
"RESOURCES",
"SCHLUNCH",
"SNAPSUB",
"STTAX",
"TENMORTSTATUS",
"TOTVAL",
"WCOHABIT",
"WICVAL",
"WKXPNS",
"WUI_LT15",
"ID",
]
spm_table = (
person[["SPM_" + column for column in SPM_UNIT_COLUMNS]]
.groupby(person.SPM_ID)
.first()
)

original_person_table = storage["person"]
original_person_table.to_csv("person.csv")
person.to_csv("spm_person.csv")

# Ensure SERIALNO is treated as string
JOIN_COLUMNS = ["SERIALNO", "SPORDER"]
original_person_table["SERIALNO"] = original_person_table[
"SERIALNO"
].astype(str)
original_person_table["SPORDER"] = original_person_table[
"SPORDER"
].astype(int)
person["SERIALNO"] = person["SERIALNO"].astype(str)
person["SPORDER"] = person["SPORDER"].astype(int)

# Add SPM_ID from the SPM person table to the original person table.
combined_person_table = pd.merge(
original_person_table,
person[JOIN_COLUMNS + ["SPM_ID"]],
on=JOIN_COLUMNS,
)

storage["person_matched"] = combined_person_table
storage["spm_unit"] = spm_table


class CensusACS_2022(CensusACS):
label = "Census ACS (2022)"
name = "census_acs_2022.h5"
file_path = STORAGE_FOLDER / "census_acs_2022.h5"
time_period = 2022
Loading

0 comments on commit 4e1d1e0

Please sign in to comment.