From d9146dd5a75ca9c8cf0cfb8196f7f1284b68dfac Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Mon, 9 Sep 2024 23:03:28 +0200 Subject: [PATCH] Migrate ACS from policyengine-us Fixes #31 --- policyengine_us_data/datasets/acs/__init__.py | 2 + policyengine_us_data/datasets/acs/acs.py | 118 +++++++++++ policyengine_us_data/datasets/acs/raw_acs.py | 198 ++++++++++++++++++ 3 files changed, 318 insertions(+) create mode 100644 policyengine_us_data/datasets/acs/__init__.py create mode 100644 policyengine_us_data/datasets/acs/acs.py create mode 100644 policyengine_us_data/datasets/acs/raw_acs.py diff --git a/policyengine_us_data/datasets/acs/__init__.py b/policyengine_us_data/datasets/acs/__init__.py new file mode 100644 index 0000000..5656048 --- /dev/null +++ b/policyengine_us_data/datasets/acs/__init__.py @@ -0,0 +1,2 @@ +from policyengine_us.data.datasets.acs.raw_acs import RawACS +from policyengine_us.data.datasets.acs.acs import ACS diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py new file mode 100644 index 0000000..e5b1646 --- /dev/null +++ b/policyengine_us_data/datasets/acs/acs.py @@ -0,0 +1,118 @@ +import logging +from policyengine_core.data import PublicDataset +import h5py +from policyengine_us.data.datasets.acs.raw_acs import RawACS +from policyengine_us.data.storage import policyengine_us_MICRODATA_FOLDER +from pandas import DataFrame + + +class ACS(PublicDataset): + name = "acs" + is_openfisca_compatible = True + label = "ACS" + folder_path = policyengine_us_MICRODATA_FOLDER + + def generate(self, year: int) -> None: + """Generates the ACS dataset. + + Args: + year (int): The year of the raw ACS to use. + """ + + # Prepare raw ACS tables + year = int(year) + if year in self.years: + self.remove(year) + if year not in RawACS.years: + RawACS.generate(year) + + raw_data = RawACS.load(year) + acs = h5py.File(ACS.file(year), mode="w") + + person, spm_unit, household = [ + raw_data[entity] for entity in ("person", "spm_unit", "household") + ] + # Add primary and foreign keys + + household.SERIALNO = household.SERIALNO.astype(int) + person.SERIALNO = person.SERIALNO.astype(int) + person.SPORDER = person.SPORDER.astype(int) + person.SPM_ID = person.SPM_ID.astype(int) + spm_unit.SPM_ID = spm_unit.SPM_ID.astype(int) + + logging.info( + f"Persons with a linked household {person.SERIALNO.isin(household.SERIALNO).mean():.1%}" + ) + person = person[person.SERIALNO.isin(household.SERIALNO)] + logging.info( + f"Households with a linked person {household.SERIALNO.isin(person.SERIALNO).mean():.1%}" + ) + household = household[household.SERIALNO.isin(person.SERIALNO)] + logging.info( + f"SPM units with a linked person {spm_unit.SPM_ID.isin(person.SPM_ID).mean():.1%}" + ) + spm_unit = spm_unit[spm_unit.SPM_ID.isin(person.SPM_ID)] + + add_id_variables(acs, person, spm_unit, household) + add_person_variables(acs, person) + add_spm_variables(acs, spm_unit) + add_household_variables(acs, household) + + raw_data.close() + acs.close() + + +ACS = ACS() + + +def add_id_variables( + acs: h5py.File, + person: DataFrame, + spm_unit: DataFrame, + household: DataFrame, +) -> None: + """Add basic ID and weight variables. + + Args: + acs (h5py.File): The ACS dataset file. + person (DataFrame): The person table of the ACS. + spm_unit (DataFrame): The SPM unit table created from the person table + of the ACS. + household (DataFrame): The household table of the ACS. + """ + acs["person_id"] = person.SERIALNO * 1e2 + person.SPORDER + acs["person_spm_unit_id"] = person.SPM_ID + acs["spm_unit_id"] = spm_unit.SPM_ID + # ACS doesn't have tax units. + acs["tax_unit_id"] = spm_unit.SPM_ID + # Until we add a family table, we'll use the person table. + acs["family_id"] = spm_unit.SPM_ID + acs["person_household_id"] = person.SERIALNO + acs["person_tax_unit_id"] = person.SPM_ID + acs["person_family_id"] = person.SPM_ID + acs["household_id"] = household.SERIALNO + + # TODO: add marital unit IDs - using person IDs for now + acs["person_marital_unit_id"] = person.SERIALNO + acs["marital_unit_id"] = person.SERIALNO.unique() + + # Add weights + acs["person_weight"] = person.PWGTP + acs["household_weight"] = household.WGTP + + +def add_person_variables(acs: h5py.File, person: DataFrame) -> None: + acs["age"] = person.AGEP + acs["employment_income"] = person.WAGP + acs["self_employment_income"] = person.SEMP + acs["total_income"] = person.PINCP + + +def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None: + acs["spm_unit_net_income_reported"] = spm_unit.SPM_RESOURCES + acs["spm_unit_spm_threshold"] = spm_unit.SPM_POVTHRESHOLD + + +def add_household_variables(acs: h5py.File, household: DataFrame) -> None: + acs["household_vehicles_owned"] = household.VEH + acs["state_fips"] = acs["household_state_fips"] = household.ST diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/raw_acs.py new file mode 100644 index 0000000..5104cd6 --- /dev/null +++ b/policyengine_us_data/datasets/acs/raw_acs.py @@ -0,0 +1,198 @@ +from io import BytesIO +import logging +from typing import List +from zipfile import ZipFile +import pandas as pd +from policyengine_core.data import PublicDataset +import requests +from tqdm import tqdm +from policyengine_us.data.storage import policyengine_us_MICRODATA_FOLDER + + +logging.getLogger().setLevel(logging.INFO) + +PERSON_COLUMNS = [ + "SERIALNO", # Household ID + "SPORDER", # Person number within household + "PWGTP", # Person weight + "AGEP", # Age + "CIT", # Citizenship + "MAR", # Marital status + "WAGP", # Wage/salary + "SSP", # Social security income + "SSIP", # Supplemental security income + "SEX", # Sex + "SEMP", # Self-employment income + "SCHL", # Educational attainment + "RETP", # Retirement income + "PAP", # Public assistance income + "OIP", # Other income + "PERNP", # Total earnings + "PINCP", # Total income + "POVPIP", # Income-to-poverty line percentage + "RAC1P", # Race +] + +HOUSEHOLD_COLUMNS = [ + "SERIALNO", # Household ID + "PUMA", # PUMA area code + "ST", # State code + "ADJHSG", # Adjustment factor for housing dollar amounts + "ADJINC", # Adjustment factor for income + "WGTP", # Household weight + "NP", # Number of persons in household + "BDSP", # Number of bedrooms + "ELEP", # Electricity monthly cost + "FULP", # Fuel monthly cost + "GASP", # Gas monthly cost + "RMSP", # Number of rooms + "RNTP", # Monthly rent + "TEN", # Tenure + "VEH", # Number of vehicles + "FINCP", # Total income + "GRNTP", # Gross rent +] + + +class RawACS(PublicDataset): + name = "raw_acs" + label = "Raw ACS" + is_openfisca_compatible = False + folder_path = policyengine_us_MICRODATA_FOLDER + + def generate(self, year: int) -> None: + year = int(year) + if year in self.years: + self.remove(year) + + spm_url = f"https://www2.census.gov/programs-surveys/supplemental-poverty-measure/datasets/spm/spm_{year}_pu.dta" + person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_pus.zip" + household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_hus.zip" + + # The data dictionary for 2019 can be found here: https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2019.pdf + + try: + with pd.HDFStore(RawACS.file(year)) as storage: + # Household file + logging.info(f"Downloading household file") + household = concat_zipped_csvs( + household_url, "psam_hus", HOUSEHOLD_COLUMNS + ) + # Remove group quarters (zero weight) + household = household[ + ~household.SERIALNO.str.contains("2019GQ") + ] + household["SERIALNO"] = household["SERIALNO"].apply( + lambda x: int(x.replace("2019HU", "")) + ) + storage["household"] = household + # Person file + logging.info(f"Downloading person file") + person = concat_zipped_csvs( + person_url, "psam_pus", PERSON_COLUMNS + ) + person = person[~person.SERIALNO.str.contains("2019GQ")] + person["SERIALNO"] = person["SERIALNO"].apply( + lambda x: int(x.replace("2019HU", "")) + ) + storage["person"] = person + # SPM unit file + logging.info(f"Downloading SPM unit file") + spm_person = pd.read_stata(spm_url).fillna(0) + spm_person.columns = spm_person.columns.str.upper() + create_spm_unit_table(storage, spm_person) + except Exception as e: + RawACS.remove(year) + logging.error( + f"Attempted to extract and save the CSV files, but encountered an error: {e}" + ) + raise e + + +RawACS = RawACS() + + +def concat_zipped_csvs( + url: str, prefix: str, columns: List[str] +) -> pd.DataFrame: + """Downloads the ACS microdata, which is a zip file containing two halves in CSV format. + + Args: + url (str): The URL of the data server. + prefix (str): The prefix of the filenames, before a/b.csv. + columns (List[str]): The columns to filter (avoids hitting memory limits). + + Returns: + pd.DataFrame: The concatenated DataFrame. + """ + req = requests.get(url, stream=True) + with BytesIO() as f: + pbar = tqdm() + for chunk in req.iter_content(chunk_size=1024): + if chunk: # filter out keep-alive new chunks + pbar.update(len(chunk)) + f.write(chunk) + f.seek(0) + zf = ZipFile(f) + logging.info(f"Loading the first half of the dataset") + a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns) + logging.info(f"Loading the second half of the dataset") + b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns) + logging.info(f"Concatenating datasets") + res = pd.concat([a, b]).fillna(0) + res.columns = res.columns.str.upper() + return res + + +def create_spm_unit_table(storage: pd.HDFStore, person: pd.DataFrame) -> None: + SPM_UNIT_COLUMNS = [ + "CAPHOUSESUB", + "CAPWKCCXPNS", + "CHILDCAREXPNS", + "EITC", + "ENGVAL", + "EQUIVSCALE", + "FEDTAX", + "FEDTAXBC", + "FICA", + "GEOADJ", + "MEDXPNS", + "NUMADULTS", + "NUMKIDS", + "NUMPER", + "POOR", + "POVTHRESHOLD", + "RESOURCES", + "SCHLUNCH", + "SNAPSUB", + "STTAX", + "TENMORTSTATUS", + "TOTVAL", + "WCOHABIT", + "WICVAL", + "WKXPNS", + "WUI_LT15", + "ID", + ] + spm_table = ( + person[["SPM_" + column for column in SPM_UNIT_COLUMNS]] + .groupby(person.SPM_ID) + .first() + ) + + original_person_table = storage["person"] + # Ensure that join keys are the same type. + JOIN_COLUMNS = ["SERIALNO", "SPORDER"] + original_person_table[JOIN_COLUMNS] = original_person_table[ + JOIN_COLUMNS + ].astype(int) + person[JOIN_COLUMNS] = person[JOIN_COLUMNS].astype(int) + # Add SPM_ID from the SPM person table to the original person table. + combined_person_table = pd.merge( + original_person_table, + person[JOIN_COLUMNS + ["SPM_ID"]], + on=JOIN_COLUMNS, + ) + + storage["person"] = combined_person_table + storage["spm_unit"] = spm_table