Skip to content

Commit

Permalink
Migrate ACS from policyengine-us
Browse files Browse the repository at this point in the history
Fixes #31
  • Loading branch information
PavelMakarchuk committed Sep 9, 2024
1 parent fbe0639 commit 99eaf5a
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 0 deletions.
2 changes: 2 additions & 0 deletions policyengine_us_data/datasets/acs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from policyengine_us.data.datasets.acs.raw_acs import RawACS
from policyengine_us.data.datasets.acs.acs import ACS
119 changes: 119 additions & 0 deletions policyengine_us_data/datasets/acs/acs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@

import logging
from policyengine_core.data import PublicDataset
import h5py
from policyengine_us.data.datasets.acs.raw_acs import RawACS
from policyengine_us.data.storage import policyengine_us_MICRODATA_FOLDER
from pandas import DataFrame


class ACS(PublicDataset):
name = "acs"
is_openfisca_compatible = True
label = "ACS"
folder_path = policyengine_us_MICRODATA_FOLDER

def generate(self, year: int) -> None:
"""Generates the ACS dataset.
Args:
year (int): The year of the raw ACS to use.
"""

# Prepare raw ACS tables
year = int(year)
if year in self.years:
self.remove(year)
if year not in RawACS.years:
RawACS.generate(year)

raw_data = RawACS.load(year)
acs = h5py.File(ACS.file(year), mode="w")

person, spm_unit, household = [
raw_data[entity] for entity in ("person", "spm_unit", "household")
]
# Add primary and foreign keys

household.SERIALNO = household.SERIALNO.astype(int)
person.SERIALNO = person.SERIALNO.astype(int)
person.SPORDER = person.SPORDER.astype(int)
person.SPM_ID = person.SPM_ID.astype(int)
spm_unit.SPM_ID = spm_unit.SPM_ID.astype(int)

logging.info(
f"Persons with a linked household {person.SERIALNO.isin(household.SERIALNO).mean():.1%}"
)
person = person[person.SERIALNO.isin(household.SERIALNO)]
logging.info(
f"Households with a linked person {household.SERIALNO.isin(person.SERIALNO).mean():.1%}"
)
household = household[household.SERIALNO.isin(person.SERIALNO)]
logging.info(
f"SPM units with a linked person {spm_unit.SPM_ID.isin(person.SPM_ID).mean():.1%}"
)
spm_unit = spm_unit[spm_unit.SPM_ID.isin(person.SPM_ID)]

add_id_variables(acs, person, spm_unit, household)
add_person_variables(acs, person)
add_spm_variables(acs, spm_unit)
add_household_variables(acs, household)

raw_data.close()
acs.close()


ACS = ACS()


def add_id_variables(
acs: h5py.File,
person: DataFrame,
spm_unit: DataFrame,
household: DataFrame,
) -> None:
"""Add basic ID and weight variables.
Args:
acs (h5py.File): The ACS dataset file.
person (DataFrame): The person table of the ACS.
spm_unit (DataFrame): The SPM unit table created from the person table
of the ACS.
household (DataFrame): The household table of the ACS.
"""
acs["person_id"] = person.SERIALNO * 1e2 + person.SPORDER
acs["person_spm_unit_id"] = person.SPM_ID
acs["spm_unit_id"] = spm_unit.SPM_ID
# ACS doesn't have tax units.
acs["tax_unit_id"] = spm_unit.SPM_ID
# Until we add a family table, we'll use the person table.
acs["family_id"] = spm_unit.SPM_ID
acs["person_household_id"] = person.SERIALNO
acs["person_tax_unit_id"] = person.SPM_ID
acs["person_family_id"] = person.SPM_ID
acs["household_id"] = household.SERIALNO

# TODO: add marital unit IDs - using person IDs for now
acs["person_marital_unit_id"] = person.SERIALNO
acs["marital_unit_id"] = person.SERIALNO.unique()

# Add weights
acs["person_weight"] = person.PWGTP
acs["household_weight"] = household.WGTP


def add_person_variables(acs: h5py.File, person: DataFrame) -> None:
acs["age"] = person.AGEP
acs["employment_income"] = person.WAGP
acs["self_employment_income"] = person.SEMP
acs["total_income"] = person.PINCP


def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None:
acs["spm_unit_net_income_reported"] = spm_unit.SPM_RESOURCES
acs["spm_unit_spm_threshold"] = spm_unit.SPM_POVTHRESHOLD


def add_household_variables(acs: h5py.File, household: DataFrame) -> None:
acs["household_vehicles_owned"] = household.VEH
acs["state_fips"] = acs["household_state_fips"] = household.ST
194 changes: 194 additions & 0 deletions policyengine_us_data/datasets/acs/raw_acs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
from io import BytesIO
import logging
from typing import List
from zipfile import ZipFile
import pandas as pd
from policyengine_core.data import PublicDataset
import requests
from tqdm import tqdm
from policyengine_us.data.storage import policyengine_us_MICRODATA_FOLDER


logging.getLogger().setLevel(logging.INFO)

PERSON_COLUMNS = [
"SERIALNO", # Household ID
"SPORDER", # Person number within household
"PWGTP", # Person weight
"AGEP", # Age
"CIT", # Citizenship
"MAR", # Marital status
"WAGP", # Wage/salary
"SSP", # Social security income
"SSIP", # Supplemental security income
"SEX", # Sex
"SEMP", # Self-employment income
"SCHL", # Educational attainment
"RETP", # Retirement income
"PAP", # Public assistance income
"OIP", # Other income
"PERNP", # Total earnings
"PINCP", # Total income
"POVPIP", # Income-to-poverty line percentage
"RAC1P", # Race
]

HOUSEHOLD_COLUMNS = [
"SERIALNO", # Household ID
"PUMA", # PUMA area code
"ST", # State code
"ADJHSG", # Adjustment factor for housing dollar amounts
"ADJINC", # Adjustment factor for income
"WGTP", # Household weight
"NP", # Number of persons in household
"BDSP", # Number of bedrooms
"ELEP", # Electricity monthly cost
"FULP", # Fuel monthly cost
"GASP", # Gas monthly cost
"RMSP", # Number of rooms
"RNTP", # Monthly rent
"TEN", # Tenure
"VEH", # Number of vehicles
"FINCP", # Total income
"GRNTP", # Gross rent
]

class RawACS(PublicDataset):
name = "raw_acs"
label = "Raw ACS"
is_openfisca_compatible = False
folder_path = policyengine_us_MICRODATA_FOLDER

def generate(self, year: int) -> None:
year = int(year)
if year in self.years:
self.remove(year)

spm_url = f"https://www2.census.gov/programs-surveys/supplemental-poverty-measure/datasets/spm/spm_{year}_pu.dta"
person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_pus.zip"
household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_hus.zip"

# The data dictionary for 2019 can be found here: https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2019.pdf

try:
with pd.HDFStore(RawACS.file(year)) as storage:
# Household file
logging.info(f"Downloading household file")
household = concat_zipped_csvs(
household_url, "psam_hus", HOUSEHOLD_COLUMNS
)
# Remove group quarters (zero weight)
household = household[
~household.SERIALNO.str.contains("2019GQ")
]
household["SERIALNO"] = household["SERIALNO"].apply(
lambda x: int(x.replace("2019HU", ""))
)
storage["household"] = household
# Person file
logging.info(f"Downloading person file")
person = concat_zipped_csvs(
person_url, "psam_pus", PERSON_COLUMNS
)
person = person[~person.SERIALNO.str.contains("2019GQ")]
person["SERIALNO"] = person["SERIALNO"].apply(
lambda x: int(x.replace("2019HU", ""))
)
storage["person"] = person
# SPM unit file
logging.info(f"Downloading SPM unit file")
spm_person = pd.read_stata(spm_url).fillna(0)
spm_person.columns = spm_person.columns.str.upper()
create_spm_unit_table(storage, spm_person)
except Exception as e:
RawACS.remove(year)
logging.error(
f"Attempted to extract and save the CSV files, but encountered an error: {e}"
)
raise e

RawACS = RawACS()

def concat_zipped_csvs(
url: str, prefix: str, columns: List[str]
) -> pd.DataFrame:
"""Downloads the ACS microdata, which is a zip file containing two halves in CSV format.
Args:
url (str): The URL of the data server.
prefix (str): The prefix of the filenames, before a/b.csv.
columns (List[str]): The columns to filter (avoids hitting memory limits).
Returns:
pd.DataFrame: The concatenated DataFrame.
"""
req = requests.get(url, stream=True)
with BytesIO() as f:
pbar = tqdm()
for chunk in req.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
pbar.update(len(chunk))
f.write(chunk)
f.seek(0)
zf = ZipFile(f)
logging.info(f"Loading the first half of the dataset")
a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns)
logging.info(f"Loading the second half of the dataset")
b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns)
logging.info(f"Concatenating datasets")
res = pd.concat([a, b]).fillna(0)
res.columns = res.columns.str.upper()
return res

def create_spm_unit_table(storage: pd.HDFStore, person: pd.DataFrame) -> None:
SPM_UNIT_COLUMNS = [
"CAPHOUSESUB",
"CAPWKCCXPNS",
"CHILDCAREXPNS",
"EITC",
"ENGVAL",
"EQUIVSCALE",
"FEDTAX",
"FEDTAXBC",
"FICA",
"GEOADJ",
"MEDXPNS",
"NUMADULTS",
"NUMKIDS",
"NUMPER",
"POOR",
"POVTHRESHOLD",
"RESOURCES",
"SCHLUNCH",
"SNAPSUB",
"STTAX",
"TENMORTSTATUS",
"TOTVAL",
"WCOHABIT",
"WICVAL",
"WKXPNS",
"WUI_LT15",
"ID",
]
spm_table = (
person[["SPM_" + column for column in SPM_UNIT_COLUMNS]]
.groupby(person.SPM_ID)
.first()
)

original_person_table = storage["person"]
# Ensure that join keys are the same type.
JOIN_COLUMNS = ["SERIALNO", "SPORDER"]
original_person_table[JOIN_COLUMNS] = original_person_table[
JOIN_COLUMNS
].astype(int)
person[JOIN_COLUMNS] = person[JOIN_COLUMNS].astype(int)
# Add SPM_ID from the SPM person table to the original person table.
combined_person_table = pd.merge(
original_person_table,
person[JOIN_COLUMNS + ["SPM_ID"]],
on=JOIN_COLUMNS,
)

storage["person"] = combined_person_table
storage["spm_unit"] = spm_table

0 comments on commit 99eaf5a

Please sign in to comment.