Skip to content

Commit

Permalink
chore: documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobdadams committed May 30, 2024
1 parent 7484911 commit 39a275a
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 22 deletions.
14 changes: 9 additions & 5 deletions src/wmrc/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def add_bogus_geometries(input_dataframe: pd.DataFrame) -> pd.DataFrame:


class SalesForceRecords:
"""A helper class that extracts data from Salesforce based on fields from WMRC's manual reports. Provides access to
the data through the .df attribute along with the field mapping and list of counties.
"""

def __init__(self, salesforce_extractor: palletjack.extract.SalesforceRestLoader):
self.salesforce_extractor = salesforce_extractor
Expand Down Expand Up @@ -185,7 +188,7 @@ def county_summaries(year_df: pd.DataFrame, county_fields: list[str]) -> pd.Data
Args:
year_df (pd.DataFrame): A dataframe of facility records for a single year (can be .applied to a groupby
(year) object). Columns include percentages for each county and the fields needed for the calculations
county_fields (List[str]): List county field names
county_fields (list[str]): List county field names
Returns:
pd.DataFrame: A dataframe of tons recycled, composted, digested, and landfilled for each county along with
Expand Down Expand Up @@ -248,7 +251,7 @@ def facility_tons_diverted_from_landfills(year_df: pd.DataFrame) -> pd.DataFrame
Args:
year_df (pd.DataFrame): Dataframe of facility records for a single year (can be .applied to a groupby
year)).
year)).
Returns:
pd.DataFrame: Facility name, id, and total tons diverted from landfills
Expand Down Expand Up @@ -299,7 +302,8 @@ def rates_per_material(
"""Calculate recycling/composting rates for each material type for a given year.
Args:
year_df (pd.DataFrame): Dataframe of facility records for a single year (can be .applied to a groupby(year) object).
year_df (pd.DataFrame): Dataframe of facility records for a single year (can be .applied to a groupby(year)
object).
classification (str): Report Classification, either "Recycling" or "Composts"
fields (list[str]): List of the fields containing the material totals.
total_field (str): The field containing the total material received for the percentage calculation.
Expand Down Expand Up @@ -339,8 +343,8 @@ def rates_per_material(

@staticmethod
def statewide_yearly_metrics(county_year_df: pd.DataFrame) -> pd.DataFrame:
"""Calculate statewide yearly metrics for recycling, composting, digestion, and landfilling (RCDL), filtering out
out of state totals.
"""Calculate statewide yearly metrics for recycling, composting, digestion, and landfilling (RCDL), filtering
out out of state totals.
Args:
county_year_df (pd.DataFrame): Dataframe of county summaries for a given year with the RCDL metrics (can be
Expand Down
137 changes: 124 additions & 13 deletions src/wmrc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def process(self):
statewide_totals_df = county_summary_df.groupby("data_year").apply(
helpers.YearlyAnalysis.statewide_yearly_metrics
)
contamination_rates_df = Summarize.contamination_rates_by_tonnage(records)
contamination_rates_df = Summarize.uncontamination_rates_by_tonnage(records)
# contamination_rates_df = Summaries._contamination_rates_by_facility(records)
statewide_metrics = pd.concat([statewide_totals_df, contamination_rates_df], axis=1)
statewide_spatial = helpers.add_bogus_geometries(statewide_metrics)
Expand Down Expand Up @@ -215,7 +215,22 @@ def process(self):
loggers = [logging.getLogger(config.SKID_NAME), logging.getLogger("palletjack")]
self._remove_log_file_handlers(loggers)

def _update_counties(self, gis, county_summary_df):
def _update_counties(self, gis: arcgis.gis.GIS, county_summary_df: pd.DataFrame) -> int:
"""Updates the live county summary data on AGOL with data from salesforce using another feature service as a geometry source.
Truncates and loads after merging the updated data with the geometries. Relies on
config.COUNTY_BOUNDARIES_ITEMID and config.COUNTY_LAYER_ITEMID to access these layers.
The geometry source layer needs to have an extra geometry placed somewhere out of the normal extent named "Out
of State" for capturing info about material from out of the state.
Args:
gis (arcgis.gis.GIS): AGOL org with both the live layer and the geometry source layer
county_summary_df (pd.DataFrame): The county summary report generated from Salesforce records
Returns:
int: Number of records updated.
"""

county_geoms = transform.FeatureServiceMerging.get_live_dataframe(gis, config.COUNTY_BOUNDARIES_ITEMID)[
["name", "SHAPE"]
Expand All @@ -230,7 +245,20 @@ def _update_counties(self, gis, county_summary_df):
update_count = updater.truncate_and_load_features(new_data)
return update_count

def _update_facilities(self, gis, facility_summary_df):
def _update_facilities(self, gis: arcgis.gis.GIS, facility_summary_df: pd.DataFrame) -> int:
"""Updates the live facility data on AGOL with data from the Google sheets and Salesforce.
Truncates and loads after merging the live data with the updated data. Does not (currently) add new features.
Relies on config.FACILITIES_LAYER_ITEMID to access the live data. MSW facility info comes from the Google
sheet, while their total diverted comes from Salesforce. All UOCC data comes from the sheet.
Args:
gis (arcgis.gis.GIS): AGOL org with the live layer
facility_summary_df (pd.DataFrame): The facility summary report generated from Salesforce records
Returns:
int: Number of facilities loaded.
"""
self.skid_logger.info("Loading data from Google Sheets...")
combined_df = self._parse_from_google_sheets()
self.skid_logger.info("Adding county names from SGID county boundaries...")
Expand All @@ -242,6 +270,7 @@ def _update_facilities(self, gis, facility_summary_df):
on="id_",
)

#: NOTE: This .update() doesn't add new rows, so if a facility is added to the sheet it won't be added to the map
#: Merge live data with sheet/sf data
live_facility_data = transform.FeatureServiceMerging.get_live_dataframe(gis, config.FACILITIES_LAYER_ITEMID)
updated_facility_data = live_facility_data.set_index("id_")
Expand All @@ -266,15 +295,19 @@ def _update_facilities(self, gis, facility_summary_df):
],
)

# #: Fields from sheet that aren't in AGOL
# updated_facility_data.drop(columns=["local_health_department", "uocc_email_address"], inplace=True)

self.skid_logger.info("Truncating and loading...")
updater = load.FeatureServiceUpdater(gis, config.FACILITIES_LAYER_ITEMID, self.tempdir_path)
load_count = updater.truncate_and_load_features(updated_facility_data)
return load_count

def _parse_from_google_sheets(self):
def _parse_from_google_sheets(self) -> pd.DataFrame:
"""Load MSW and UOCC data from Google Sheets and combine them into a single dataframe.
Does some field cleaning and aligns the UOCC columns with the MSW columns for consistency.
Returns:
pd.DataFrame: Single dataframe with the unified data from the two sheets
"""
#: Get individual sheets
gsheet_extractor = extract.GSheetLoader(self.secrets.SERVICE_ACCOUNT_JSON)
sw_df = gsheet_extractor.load_specific_worksheet_into_dataframe(
Expand All @@ -283,10 +316,7 @@ def _parse_from_google_sheets(self):
uocc_df = gsheet_extractor.load_specific_worksheet_into_dataframe(self.secrets.SHEET_ID, "UOCCs", by_title=True)

#: Fix columns
try:
sw_df.drop(columns=[""], inplace=True) #: Drop empty columns that don't have a name
except KeyError:
pass
sw_df.drop(columns=[""], inplace=True, errors="ignore") #: Drop empty columns that don't have a name

sw_df.rename(
columns={"Accept Material\n Dropped \n Off by the Public": "Accept Material Dropped Off by the Public"},
Expand Down Expand Up @@ -317,7 +347,17 @@ def _parse_from_google_sheets(self):
return renamed_df

@staticmethod
def _get_county_names(input_df, gis):
def _get_county_names(input_df: pd.DataFrame, gis: arcgis.gis.GIS) -> pd.DataFrame:
"""Assigns a county name to each facility based on the SGID county boundaries hosted in AGOL.
Args:
input_df (pd.DataFrame): Facility dataframe with "latitude" and "longitude" columns
gis (arcgis.gis.GIS): AGOL org containing the county boundaries
Returns:
pd.DataFrame: A spatially-enabled dataframe of the facilities in WKID 26912 with county names added.
"""

#: Load counties from open data feature service
counties_df = pd.DataFrame.spatial.from_layer(
arcgis.features.FeatureLayer.fromitem(gis.content.get(config.COUNTIES_ITEMID))
Expand All @@ -344,6 +384,11 @@ def _get_county_names(input_df, gis):
return joined_points_df

def _load_salesforce_data(self) -> helpers.SalesForceRecords:
"""Helper method to connect to and load data from Salesforce.
Returns:
helpers.SalesForceRecords: An object containing the records from Salesforce along with other derived data.
"""

salesforce_credentials = extract.SalesforceApiUserCredentials(
self.secrets.SF_CLIENT_SECRET, self.secrets.SF_CLIENT_ID
Expand All @@ -363,6 +408,14 @@ class Summarize:

@staticmethod
def county_summaries(records: helpers.SalesForceRecords) -> pd.DataFrame:
"""Perform the county summary per year analysis on the Salesforce records.
Args:
records (helpers.SalesForceRecords): Salesforce records loaded into a helper object
Returns:
pd.DataFrame: County summary report indexed by county name with data_year column as integer
"""

county_df = records.df.groupby("Calendar_Year__c").apply(
helpers.YearlyAnalysis.county_summaries, county_fields=records.county_fields
Expand All @@ -379,6 +432,15 @@ def county_summaries(records: helpers.SalesForceRecords) -> pd.DataFrame:

@staticmethod
def facility_summaries(records: helpers.SalesForceRecords) -> pd.DataFrame:
"""Perform the facility summary per year analysis on the Salesforce records.
Args:
records (helpers.SalesForceRecords): Salesforce records loaded into a helper object
Returns:
pd.DataFrame: Facilities summary report with default index and data_year column as integer
"""

facility_summaries = (
records.df.groupby("Calendar_Year__c")
.apply(
Expand All @@ -394,6 +456,16 @@ def facility_summaries(records: helpers.SalesForceRecords) -> pd.DataFrame:

@staticmethod
def materials_recycled(records: helpers.SalesForceRecords) -> pd.DataFrame:
"""Perform the materials recycled analysis per year on the Salesforce records.
Args:
records (helpers.SalesForceRecords): Salesforce records loaded into a helper object. Relies on both df and
field_mapping attributes.
Returns:
pd.DataFrame: Materials recycled report with default index and year_ column as integer
"""

recycling_fields = [
"Combined Total of Material Received",
"Total Corrugated Boxes received",
Expand Down Expand Up @@ -435,6 +507,16 @@ def materials_recycled(records: helpers.SalesForceRecords) -> pd.DataFrame:

@staticmethod
def materials_composted(records: helpers.SalesForceRecords) -> pd.DataFrame:
"""Perform the materials composted per year analysis on the Salesforce records.
Args:
records (helpers.SalesForceRecords): Helper object containing the Salesforce records. Relies on both df and
field_mapping attributes.
Returns:
pd.DataFrame: Materials composted report with default index and year_ column as integer
"""

composting_fields = [
"Municipal Solid Waste",
"Total Material Received Compost",
Expand Down Expand Up @@ -469,19 +551,38 @@ def materials_composted(records: helpers.SalesForceRecords) -> pd.DataFrame:
return materials_composted

@staticmethod
def contamination_rates_by_tonnage(records: helpers.SalesForceRecords) -> pd.DataFrame:
def uncontamination_rates_by_tonnage(records: helpers.SalesForceRecords) -> pd.Series:
"""Calculates a yearly uncontamination rate based on the Salesforce records.
Uncontamination rate is opposite of contaminated rate (5% contamination = 95% uncontaminated). Rate is
calculated by using the contamination rate to determine contaminated tonnage and comparing that to the total
tonnage handled by facilities reporting a contamination rate.
Args:
records (helpers.SalesForceRecords): Helper object containing the Salesforce records
Returns:
pd.Series: Uncontamination rates per year with index name data_year and series name
"annual_recycling_uncontaminated_rate"
"""
#: First, create a modifier to account for material from out-of-state
records.df["in_state_modifier"] = (100 - records.df["Out_of_State__c"]) / 100

#: Calculate contaminated tonnage
records.df["recycling_tons_contaminated"] = (
records.df["Annual_Recycling_Contamination_Rate__c"]
/ 100
* records.df["Combined_Total_of_Material_Recycled__c"]
* records.df["in_state_modifier"]
)

#: Calculate total tonnage from facilities reporting a contamination rate
records.df["recycling_tons_report_contamination_total"] = pd.NA
records.df.loc[
~records.df["recycling_tons_contaminated"].isnull(), "recycling_tons_report_contamination_total"
] = (records.df["Combined_Total_of_Material_Recycled__c"] * records.df["in_state_modifier"])

#: Invert to get uncontaminated rate
clean_rates = records.df.groupby("Calendar_Year__c").apply(
lambda year_df: (
1
Expand All @@ -500,6 +601,16 @@ def contamination_rates_by_tonnage(records: helpers.SalesForceRecords) -> pd.Dat

@staticmethod
def contamination_rates_by_facility(records: helpers.SalesForceRecords) -> pd.DataFrame:
"""Return average facility uncontamination rates by year by averaging the individual facility percentages.
Uncontamination rate is opposite of contaminated rate (5% contamination = 95% uncontaminated).
Args:
records (helpers.SalesForceRecords): Helper object containing the Salesforce records
Returns:
pd.DataFrame: Dataframe of count, mean, and std per year from .describe()
"""

records.df["annual_recycling_uncontaminated_rate"] = 100 - records.df["Annual_Recycling_Contamination_Rate__c"]
yearly_stats = records.df.groupby("Calendar_Year__c").describe()
Expand Down
8 changes: 4 additions & 4 deletions tests/test_wmrc.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def test_county_summaries_replace_nan_with_0(self, mocker):

pd.testing.assert_frame_equal(result_df, test_df)

def test_contamination_rates_by_tonnage_happy_path(self, mocker):
def test_uncontamination_rates_by_tonnage_happy_path(self, mocker):
records = mocker.Mock()
records.df = pd.DataFrame(
{
Expand All @@ -150,7 +150,7 @@ def test_contamination_rates_by_tonnage_happy_path(self, mocker):
}
)

output_series = main.Summarize.contamination_rates_by_tonnage(records)
output_series = main.Summarize.uncontamination_rates_by_tonnage(records)

test_df = pd.Series(
{
Expand All @@ -163,7 +163,7 @@ def test_contamination_rates_by_tonnage_happy_path(self, mocker):

pd.testing.assert_series_equal(output_series, test_df)

def test_contamination_rates_by_tonnage_uses_out_of_state_modifier(self, mocker):
def test_uncontamination_rates_by_tonnage_uses_out_of_state_modifier(self, mocker):
records = mocker.Mock()
records.df = pd.DataFrame(
{
Expand All @@ -175,7 +175,7 @@ def test_contamination_rates_by_tonnage_uses_out_of_state_modifier(self, mocker)
}
)

output_series = main.Summarize.contamination_rates_by_tonnage(records)
output_series = main.Summarize.uncontamination_rates_by_tonnage(records)

test_df = pd.Series(
{
Expand Down

0 comments on commit 39a275a

Please sign in to comment.