Skip to content

Commit

Permalink
chore: make ruff happy
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobdadams committed Dec 14, 2023
1 parent 019bc33 commit 1bed54d
Showing 1 changed file with 67 additions and 68 deletions.
135 changes: 67 additions & 68 deletions src/wmrc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
Run the wmrc script as a cloud function.
"""
import json
import locale
import logging
import os
import pprint
import sys
from datetime import date, datetime
from pathlib import Path
Expand All @@ -17,8 +14,8 @@
import arcgis
import google.auth
import pandas as pd
from arcgis.features import GeoAccessor, GeoSeriesAccessor
from palletjack import extract, transform, load
from arcgis.features import GeoAccessor, GeoSeriesAccessor # noqa: F401
from palletjack import extract, load, transform
from supervisor.message_handlers import SendGridHandler
from supervisor.models import MessageDetails, Supervisor

Expand All @@ -41,21 +38,21 @@ def _get_secrets():
dict: The secrets .json loaded as a dictionary
"""

secret_folder = Path('/secrets')
secret_folder = Path("/secrets")

#: Try to get the secrets from the Cloud Function mount point
if secret_folder.exists():
secrets_dict = json.loads(Path('/secrets/app/secrets.json').read_text(encoding='utf-8'))
secrets_dict = json.loads(Path("/secrets/app/secrets.json").read_text(encoding="utf-8"))
credentials, _ = google.auth.default()
secrets_dict['SERVICE_ACCOUNT_JSON'] = credentials
secrets_dict["SERVICE_ACCOUNT_JSON"] = credentials
return secrets_dict

#: Otherwise, try to load a local copy for local development
secret_folder = (Path(__file__).parent / 'secrets')
secret_folder = Path(__file__).parent / "secrets"
if secret_folder.exists():
return json.loads((secret_folder / 'secrets.json').read_text(encoding='utf-8'))
return json.loads((secret_folder / "secrets.json").read_text(encoding="utf-8"))

raise FileNotFoundError('Secrets folder not found; secrets not loaded.')
raise FileNotFoundError("Secrets folder not found; secrets not loaded.")


def _initialize(log_path, sendgrid_api_key):
Expand All @@ -71,17 +68,17 @@ def _initialize(log_path, sendgrid_api_key):

skid_logger = logging.getLogger(config.SKID_NAME)
skid_logger.setLevel(config.LOG_LEVEL)
palletjack_logger = logging.getLogger('palletjack')
palletjack_logger = logging.getLogger("palletjack")
palletjack_logger.setLevel(config.LOG_LEVEL)

cli_handler = logging.StreamHandler(sys.stdout)
cli_handler.setLevel(config.LOG_LEVEL)
formatter = logging.Formatter(
fmt='%(levelname)-7s %(asctime)s %(name)15s:%(lineno)5s %(message)s', datefmt='%Y-%m-%d %H:%M:%S'
fmt="%(levelname)-7s %(asctime)s %(name)15s:%(lineno)5s %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
cli_handler.setFormatter(formatter)

log_handler = logging.FileHandler(log_path, mode='w')
log_handler = logging.FileHandler(log_path, mode="w")
log_handler.setLevel(config.LOG_LEVEL)
log_handler.setFormatter(formatter)

Expand All @@ -95,10 +92,10 @@ def _initialize(log_path, sendgrid_api_key):
#: (all log messages were duplicated if put at beginning)
logging.captureWarnings(True)

skid_logger.debug('Creating Supervisor object')
skid_logger.debug("Creating Supervisor object")
skid_supervisor = Supervisor(handle_errors=False)
sendgrid_settings = config.SENDGRID_SETTINGS
sendgrid_settings['api_key'] = sendgrid_api_key
sendgrid_settings["api_key"] = sendgrid_api_key
skid_supervisor.add_message_handler(
SendGridHandler(
sendgrid_settings=sendgrid_settings, client_name=config.SKID_NAME, client_version=version.__version__
Expand All @@ -122,13 +119,12 @@ def _remove_log_file_handlers(log_name, loggers):
if log_name in handler.stream.name:
logger.removeHandler(handler)
handler.close()
except Exception as error:
except Exception:
pass


def process():
"""The main function that does all the work.
"""
"""The main function that does all the work."""

#: Set up secrets, tempdir, supervisor, and logging
start = datetime.now()
Expand All @@ -151,113 +147,116 @@ def process():
gis = arcgis.gis.GIS(config.AGOL_ORG, secrets.AGOL_USER, secrets.AGOL_PASSWORD)

#: Do the work
module_logger.info('Loading data from Google Sheets...')
module_logger.info("Loading data from Google Sheets...")
combined_df = _parse_from_google_sheets(secrets)
module_logger.info('Adding county names from SGID county boundaries...')
module_logger.info("Adding county names from SGID county boundaries...")
with_counties_df = _get_county_names(combined_df, gis)

module_logger.info('Preparing data for truncate and load...')
module_logger.info("Preparing data for truncate and load...")
proj_df = with_counties_df.copy()
proj_df.spatial.project(4326)
proj_df.spatial.set_geometry('SHAPE')
proj_df.spatial.sr = {'wkid': 4326}
proj_df['last_updated'] = date.today()
proj_df = transform.DataCleaning.switch_to_datetime(proj_df, ['last_updated'])
proj_df.spatial.set_geometry("SHAPE")
proj_df.spatial.sr = {"wkid": 4326}
proj_df["last_updated"] = date.today()
proj_df = transform.DataCleaning.switch_to_datetime(proj_df, ["last_updated"])
proj_df = transform.DataCleaning.switch_to_float(
proj_df, [
'latitude',
'longitude',
'tons_of_material_diverted_from_',
'gallons_of_used_oil_collected_for_recycling_last_year',
]
proj_df,
[
"latitude",
"longitude",
"tons_of_material_diverted_from_",
"gallons_of_used_oil_collected_for_recycling_last_year",
],
)

module_logger.info('Truncating and loading...')
module_logger.info("Truncating and loading...")
updater = load.FeatureServiceUpdater(gis, config.FEATURE_LAYER_ITEMID, tempdir)
load_count = updater.truncate_and_load_features(proj_df)

end = datetime.now()

summary_message = MessageDetails()
summary_message.subject = f'{config.SKID_NAME} Update Summary'
summary_message.subject = f"{config.SKID_NAME} Update Summary"
summary_rows = [
f'{config.SKID_NAME} update {start.strftime("%Y-%m-%d")}',
'=' * 20,
'',
"=" * 20,
"",
f'Start time: {start.strftime("%H:%M:%S")}',
f'End time: {end.strftime("%H:%M:%S")}',
f'Duration: {str(end-start)}',
'',
f'Rows loaded: {load_count}',
f"Duration: {str(end-start)}",
"",
f"Rows loaded: {load_count}",
]

summary_message.message = '\n'.join(summary_rows)
summary_message.message = "\n".join(summary_rows)
summary_message.attachments = tempdir_path / log_name

skid_supervisor.notify(summary_message)

#: Remove file handler so the tempdir will close properly
loggers = [logging.getLogger(config.SKID_NAME), logging.getLogger('palletjack')]
loggers = [logging.getLogger(config.SKID_NAME), logging.getLogger("palletjack")]
_remove_log_file_handlers(log_name, loggers)


def _parse_from_google_sheets(secrets):

#: Get individual sheets
gsheet_extractor = extract.GSheetLoader(secrets.SERVICE_ACCOUNT_JSON)
sw_df = gsheet_extractor.load_specific_worksheet_into_dataframe(secrets.SHEET_ID, 'SW Facilities', by_title=True)
uocc_df = gsheet_extractor.load_specific_worksheet_into_dataframe(secrets.SHEET_ID, 'UOCCs', by_title=True)
sw_df = gsheet_extractor.load_specific_worksheet_into_dataframe(secrets.SHEET_ID, "SW Facilities", by_title=True)
uocc_df = gsheet_extractor.load_specific_worksheet_into_dataframe(secrets.SHEET_ID, "UOCCs", by_title=True)

#: Fix columns
sw_df.drop(columns=[''], inplace=True) #: Drop empty columns that don't have a name
sw_df.drop(columns=[""], inplace=True) #: Drop empty columns that don't have a name
sw_df.rename(
columns={'Accept Material\n Dropped \n Off by the Public': 'Accept Material Dropped Off by the Public'},
inplace=True
columns={"Accept Material\n Dropped \n Off by the Public": "Accept Material Dropped Off by the Public"},
inplace=True,
)
uocc_df.rename(
columns={
'Type': 'Class',
'Accept Material\n Dropped \n Off by the Public': 'Accept Material Dropped Off by the Public'
"Type": "Class",
"Accept Material\n Dropped \n Off by the Public": "Accept Material Dropped Off by the Public",
},
inplace=True
inplace=True,
)
combined_df = pd.concat([sw_df, uocc_df]).query('Status in ["Open", "OPEN"]')

renamed_df = transform.DataCleaning.rename_dataframe_columns_for_agol(combined_df).rename(columns=str.lower).rename(
columns={
'longitude_': 'longitude',
'accept_material_dropped_off_by_the_public': 'accept_material_dropped_off_by_',
'tons_of_material_diverted_from_landfills_last_year': 'tons_of_material_diverted_from_'
}
renamed_df = (
transform.DataCleaning.rename_dataframe_columns_for_agol(combined_df)
.rename(columns=str.lower)
.rename(
columns={
"longitude_": "longitude",
"accept_material_dropped_off_by_the_public": "accept_material_dropped_off_by_",
"tons_of_material_diverted_from_landfills_last_year": "tons_of_material_diverted_from_",
}
)
)

return renamed_df


def _get_county_names(input_df, gis):

#: Load counties from open data feature service
counties_df = pd.DataFrame.spatial.from_layer(
arcgis.features.FeatureLayer.fromitem(gis.content.get(config.COUNTIES_ITEMID))
)
counties_df.spatial.project(26912)
counties_df.reset_index(inplace=True)
counties_df = counties_df.reindex(columns=['SHAPE', 'NAME']) #: We only care about the county name
counties_df.spatial.set_geometry('SHAPE')
counties_df.spatial.sr = {'wkid': 26912}
counties_df = counties_df.reindex(columns=["SHAPE", "NAME"]) #: We only care about the county name
counties_df.spatial.set_geometry("SHAPE")
counties_df.spatial.sr = {"wkid": 26912}

#: Convert dataframe to spatial
spatial_df = pd.DataFrame.spatial.from_xy(input_df, x_column='longitude', y_column='latitude')
spatial_df = pd.DataFrame.spatial.from_xy(input_df, x_column="longitude", y_column="latitude")
spatial_df.reset_index(drop=True, inplace=True)
spatial_df.spatial.project(26912)
spatial_df.spatial.set_geometry('SHAPE')
spatial_df.spatial.sr = {'wkid': 26912}
spatial_df.spatial.set_geometry("SHAPE")
spatial_df.spatial.sr = {"wkid": 26912}

#: Perform the join, clean up the output
joined_points_df = spatial_df.spatial.join(counties_df, 'left', 'within')
joined_points_df.drop(columns=['index_right'], inplace=True)
joined_points_df.rename(columns={'NAME': 'county_name'}, inplace=True)
joined_points_df['county_name'] = joined_points_df['county_name'].str.title()
joined_points_df = spatial_df.spatial.join(counties_df, "left", "within")
joined_points_df.drop(columns=["index_right"], inplace=True)
joined_points_df.rename(columns={"NAME": "county_name"}, inplace=True)
joined_points_df["county_name"] = joined_points_df["county_name"].str.title()

return joined_points_df

Expand Down Expand Up @@ -293,5 +292,5 @@ def main(event, context): # pylint: disable=unused-argument


#: Putting this here means you can call the file via `python main.py` and it will run. Useful for pre-GCF testing.
if __name__ == '__main__':
if __name__ == "__main__":
process()

0 comments on commit 1bed54d

Please sign in to comment.