diff --git a/src/parser/__init__.py b/src/parser/__init__.py index 7b7b973..40359de 100644 --- a/src/parser/__init__.py +++ b/src/parser/__init__.py @@ -171,6 +171,13 @@ def parse(self, county, case_number, test): #remove the test value here and just # Adds a hash to the JSON file of the underlying HTML body = case_soup.find("body") + """ + Why balance table is dropped before hashing: + The balance table is excluded from the hashing because + balance is updated as any costs are paid off. Otherwise, + the hash would change frequently and multiple versions + of the case would be captured that we don't want. + """ balance_table = body.find_all("table")[-1] if "Balance Due" in balance_table.text: balance_table.decompose() diff --git a/src/scraper/__init__.py b/src/scraper/__init__.py index 4feec12..cd0881c 100644 --- a/src/scraper/__init__.py +++ b/src/scraper/__init__.py @@ -1,9 +1,7 @@ import logging import os -import re import csv import urllib.parse -import json import sys from datetime import datetime, timedelta from time import time @@ -11,85 +9,194 @@ from bs4 import BeautifulSoup from .helpers import * import importlib +from typing import Optional, Tuple, Callable, Type, List +import importlib.util +import re class Scraper: """Scrape Odyssey html files into an output folder""" def __init__(self): pass - def set_defaults(self, ms_wait, start_date, end_date, court_calendar_link_text, case_number): - if not ms_wait: - ms_wait = 200 - if not start_date: - start_date = '2024-07-01' - if not end_date: - end_date = '2024-07-01' - if not court_calendar_link_text: - court_calendar_link_text = "Court Calendar" - if not case_number: - case_number = None - return ms_wait, start_date, end_date, court_calendar_link_text, case_number - - def configure_logger(self): - # configure the logger - logger = logging.getLogger(name="pid: " + str(os.getpid())) - logging.basicConfig() - logging.root.setLevel(level="INFO") - logger.info("Scraper class initialized") + def set_defaults( + self, + ms_wait: int | None = None, + start_date: str | None = None, + end_date: str | None = None, + court_calendar_link_text: str | None = None, + case_number: str | None = None, + ssl: bool | None = None, + county: str | None = None, + case_html_path: str | None = None, + ) -> Tuple[int, str, str, str, Optional[str], bool, str, str]: + """ + Sets default values for the provided optional parameters. + + Defaults: + - `ms_wait`: 200 milliseconds if not provided. + - `start_date`: '2024-07-01' if not provided. + - `end_date`: '2024-07-01' if not provided. + - `court_calendar_link_text`: 'Court Calendar' if not provided. + - `case_number`: None if not provided. + + :param ms_wait: Milliseconds to wait. + :param start_date: Start date in YYYY-MM-DD format. + :param end_date: End date in YYYY-MM-DD format. + :param court_calendar_link_text: Text for the court calendar link. + :param case_number: Case number, or None. + + :returns: A tuple containing: + - ms_wait (int): Milliseconds to wait. + - start_date (str): Start date. + - end_date (str): End date. + - court_calendar_link_text (str): Text for court calendar link. + - case_number (Optional[str]): Case number or None. + """ + + # Assign default values if parameters are not provided + ms_wait = ms_wait if ms_wait is not None else 200 + start_date = start_date if start_date is not None else '2024-07-01' + end_date = end_date if end_date is not None else '2024-07-01' + court_calendar_link_text = court_calendar_link_text if court_calendar_link_text is not None else "Court Calendar" + # case_number defaults to None if not provided + case_number = case_number + ssl = ssl if ssl is not None else True + county = county if county is not None else 'hays' + case_html_path = case_html_path if case_html_path is not None else os.path.join(os.path.dirname(__file__), "..", "..", "data", county, "case_html") + return ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path + + def configure_logger(self) -> logging.Logger: + """ + Configures and returns a logger instance for the scraper class. + + This method sets up the logger with a unique name based on the process ID, + configures the logging level to INFO, and logs an initialization message. + + :returns: Configured logger instance. + """ + # Configure the logger + logger = logging.getLogger(name=f"pid: {os.getpid()}") + + # Set up basic configuration for the logging system + logging.basicConfig(level=logging.INFO) + return logger - def format_county(self, county): - county = county.lower() - return county + def format_county(self, county: str) -> str: + """ + Formats the county name to lowercase. + + :param county: The name of the county to be formatted. + :returns: The county name in lowercase. + :raises TypeError: If the provided county name is not a string. + """ + + return re.sub(r'[^\w]+', '', county.lower()) + + def create_session(self, logger: logging.Logger, ssl) -> requests.sessions.Session: + """ + Sets up a `requests.Session` with or without SSL verification and suppresses + related warnings. - def create_session(self): + Defaults to enable SSL. + + :param logger: Logger instance for logging errors. + :returns: Configured session object. + """ + # Create and configure the session session = requests.Session() - session.verify = False - requests.packages.urllib3.disable_warnings( - requests.packages.urllib3.exceptions.InsecureRequestWarning - ) + + # Optionally SSL certificate verification. Default to True unless False passed. + session.verify = ssl + requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) + return session - def make_directories(self, county): - # make directories if not present - case_html_path = os.path.join( - os.path.dirname(__file__), "..", "..", "data", county, "case_html" - ) + def make_directories(self, county: str, logger: logging.Logger, case_html_path) -> str: + """ + Creates necessary directories for storing case HTML files. + + This method constructs a path based on the county name and ensures that + all required directories in the path are created. If the directories already + exist, no action is taken. + + :param county: The name of the county, used to create a specific directory path. + :param logger: Logger instance for logging errors. + :returns: The path to the created directories. + :raises OSError: If there is an error creating the directories. + """ + + # Create the directories if they do not exist os.makedirs(case_html_path, exist_ok=True) + return case_html_path - def get_ody_link(self, county, logger): - # get county portal and version year information from csv file - base_url = odyssey_version = notes = None - with open( - os.path.join( - os.path.dirname(__file__), "..", "..", "resources", "texas_county_data.csv" - ), - mode="r", - ) as file_handle: - csv_file = csv.DictReader(file_handle) - for row in csv_file: - if row["county"].lower() == county.lower(): - base_url = row["portal"] - # add trailing slash if not present, otherwise urljoin breaks - if base_url[-1] != "/": - base_url += "/" - logger.info(f"{base_url} - scraping this url") - odyssey_version = int(row["version"].split(".")[0]) - notes = row["notes"] - break - if not base_url or not odyssey_version: - raise Exception( - "The required data to scrape this county is not in ./resources/texas_county_data.csv" - ) + # get county portal URL, Odyssey version, and notes from csv file + def get_ody_link(self, + county: str, + logger: logging.Logger + ) -> Tuple[str, str, str ]: + """ + Retrieves Odyssey-related information for a given county from a CSV file. + + This function reads county-specific data from a CSV file located in the `resources` directory. + It searches for the county name in the CSV file, extracts the corresponding base URL, Odyssey + version, and any additional notes. The base URL is formatted with a trailing slash if necessary. + + :param county: The name of the county for which to retrieve Odyssey information. + :param logger: Logger instance for logging errors and information. + :returns: A tuple containing: + - base_url (str): The base URL for the county’s portal. + - odyssey_version (str): The major version of Odyssey associated with the county. + - notes (str): Additional notes related to the county. + :raises Exception: If the county is not found in the CSV file or if required data is missing. + """ + + try: + base_url = odyssey_version = notes = None + # CSV is located in 'resources' folder + with open( + os.path.join(os.path.dirname(__file__), "..", "..", "resources", "texas_county_data.csv"), + mode="r", + ) as file_handle: + csv_file = csv.DictReader(file_handle) + for row in csv_file: + if row["county"].lower() == county.lower(): + base_url = row["portal"] + # add trailing slash if not present, otherwise urljoin breaks + if base_url[-1] != "/": + base_url += "/" + logger.info(f"{base_url} - scraping this url") + odyssey_version = int(row["version"].split(".")[0]) + notes = row["notes"] + break + if not base_url or not odyssey_version: + raise Exception("The required data to scrape this county is not in /resources/texas_county_data.csv") + except Exception as e: + logger.exception(e, "Error getting county-specific information from csv.") + raise return base_url, odyssey_version, notes - def get_class_and_method(self, county): - # Construct the module, class, and method names - module_name = county #ex: 'hays' - class_name = f"Scraper{county.capitalize()}" #ex: 'ScraperHays' - method_name = f"scraper_{county}" #ex: 'scraper_hays' + def get_class_and_method( + self, + county: str, + logger: logging.Logger + ) -> Tuple[Type[object], Callable]: + """ + Dynamically imports a module, retrieves a class, and gets a method from it based on the county name. + + :param county: The name of the county, used to construct module, class, and method names. + :param logger: Logger instance for logging errors. + :returns: A tuple containing the instance of the class and the method callable. + :raises ImportError: If the module cannot be imported. + :raises AttributeError: If the class or method cannot be found. + """ + + module_name = county + class_name = f"Scraper{county.capitalize()}" + method_name = f"scraper_{county}" + # Add the current directory to the system path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) @@ -98,32 +205,55 @@ def get_class_and_method(self, county): module = importlib.import_module(module_name) # Retrieve the class from the module - cls = getattr(module, class_name) + cls = getattr(module, class_name, None) if cls is None: - print(f"Class '{class_name}' not found in module '{module_name}'.") - return None, None - + raise AttributeError(f"Class '{class_name}' not found in module '{module_name}'") + # Instantiate the class instance = cls() # Retrieve the method with the specified name method = getattr(instance, method_name, None) if method is None: - print(f"Method '{method_name}' not found in class '{class_name}'.") - return instance, None - + raise AttributeError(f"Method '{method_name}' not found in class '{class_name}'") + return instance, method - except ModuleNotFoundError: - print(f"Module '{module_name}' not found.") - return None, None - def scrape_main_page(self, base_url, odyssey_version, session, notes, logger, ms_wait): - # if odyssey_version < 2017, scrape main page first to get necessary data - if odyssey_version < 2017: + except (FileNotFoundError, ImportError, AttributeError) as e: + logger.exception(e, "Error dynamically loading module or retrieving class/method.") + raise + + def scrape_main_page(self, + base_url: str, + odyssey_version: int, + session: requests.sessions.Session, + notes: str, + logger: logging.Logger, + ms_wait: int + ) -> Tuple[str, BeautifulSoup]: + """ + Scrapes the main page of the Odyssey site, handling login if required, and returns the page's HTML and parsed content. + + This function handles a special case where some sites may require a public guest login. If the `notes` parameter + contains a "PUBLICLOGIN#" identifier, it will extract the username and password from the `notes`, perform the login, + and then proceed to scrape the main page. + + :param base_url: The base URL of the main page to scrape. + :param odyssey_version: The version of Odyssey; currently not used in this function. + :param session: The `requests` session object used for making HTTP requests. + :param notes: A string containing notes that may include login credentials in the format "PUBLICLOGIN#username/password". + :param logger: Logger instance for logging errors and debug information. + :param ms_wait: The number of milliseconds to wait between retry attempts. + :returns: A tuple containing: + - main_page_html (str): The raw HTML content of the main page. + - main_soup (BeautifulSoup): A BeautifulSoup object containing the parsed HTML content. + :raises Exception: If any error occurs during the HTTP requests or HTML parsing. + """ + + try: # some sites have a public guest login that must be used if "PUBLICLOGIN#" in notes: userpass = notes.split("#")[1].split("/") - data = { "UserName": userpass[0], "Password": userpass[1], @@ -132,7 +262,7 @@ def scrape_main_page(self, base_url, odyssey_version, session, notes, logger, ms "SignOn": "Sign On", } - response = request_page_with_retry( + request_page_with_retry( session=session, url=urllib.parse.urljoin(base_url, "login.aspx"), logger=logger, @@ -150,31 +280,72 @@ def scrape_main_page(self, base_url, odyssey_version, session, notes, logger, ms ms_wait=ms_wait, ) main_soup = BeautifulSoup(main_page_html, "html.parser") - return main_page_html, main_soup + except Exception as e: + logger.exception(e, f"Error scraping main page for main page HTML.") + raise + return main_page_html, main_soup - def scrape_search_page(self, base_url, odyssey_version, main_page_html, main_soup, session, logger, ms_wait, court_calendar_link_text): - # build url for court calendar + def scrape_search_page( + self, + base_url: str, + odyssey_version: int, + main_page_html: str, + main_soup: BeautifulSoup, + session: requests.sessions.Session, + logger: logging.Logger, + ms_wait: int, + court_calendar_link_text: str + ) -> Tuple[str, str, BeautifulSoup]: + """ + Scrapes the search page URL and data based on the main page content. + + This method extracts the search page ID from the court calendar link, constructs the URL for the search page, + and retrieves the search page HTML. Depending on the Odyssey version, it either uses the extracted URL or a + default URL. It then parses the search page HTML into a BeautifulSoup object. + + :param base_url: The base URL for constructing full URLs. + :param odyssey_version: The version of Odyssey, used to determine the correct URL and verification text. + :param main_page_html: The HTML content of the main page. + :param main_soup: Parsed BeautifulSoup object of the main page HTML. + :param session: The session object for making HTTP requests. + :param logger: Logger instance for logging errors and information. + :param ms_wait: Milliseconds to wait before making requests. + :param court_calendar_link_text: Text to search for in the court calendar link. + :returns: A tuple containing the search page URL, search page HTML, and the BeautifulSoup object of the search page. + :raises ValueError: If the court calendar link is not found on the main page. + """ + + # Extract the search page ID from the court calendar link search_page_id = None for link in main_soup.select("a.ssSearchHyperlink"): if court_calendar_link_text in link.text: search_page_id = link["href"].split("?ID=")[1].split("'")[0] + break # Exit loop once the link is found + if not search_page_id: write_debug_and_quit( verification_text="Court Calendar link", page_text=main_page_html, logger=logger, ) - search_url = base_url + "Search.aspx?ID=" + search_page_id + raise ValueError("Court Calendar link not found on the main page.") - # hit the search page to gather initial data + # Build the URL for the search page + search_url = f"{base_url}Search.aspx?ID={search_page_id}" + + # Determine the correct URL and verification text based on Odyssey version + if odyssey_version < 2017: + search_url = search_url + verification_text = "Court Calendar" + else: + search_url = urllib.parse.urljoin(base_url, "Home/Dashboard/26") + verification_text = "SearchCriteria.SelectedCourt" + + # Hit the search page to gather initial data search_page_html = request_page_with_retry( session=session, - url=search_url - if odyssey_version < 2017 - else urllib.parse.urljoin(base_url, "Home/Dashboard/26"), - verification_text="Court Calendar" - if odyssey_version < 2017 - else "SearchCriteria.SelectedCourt", + url=search_url, + verification_text=verification_text, http_method=HTTPMethod.GET, logger=logger, ms_wait=ms_wait, @@ -183,28 +354,64 @@ def scrape_search_page(self, base_url, odyssey_version, main_page_html, main_sou return search_url, search_page_html, search_soup - def get_hidden_values(self, odyssey_version, main_soup, search_soup, logger): - # we need these hidden values to POST a search + def get_hidden_values( + self, + odyssey_version: int, + main_soup: BeautifulSoup, + search_soup: BeautifulSoup, + logger: logging.Logger + ) -> Dict[str, str]: + """ + Extracts hidden input values and additional data from the search page. + + :param odyssey_version: The version of Odyssey to determine logic. + :param main_soup: Parsed BeautifulSoup object of the main page HTML. + :param search_soup: Parsed BeautifulSoup object of the search page HTML. + :param logger: Logger instance for logging information. + :returns: Dictionary of hidden input names and their values. + """ + + # Extract hidden input values hidden_values = { hidden["name"]: hidden["value"] for hidden in search_soup.select('input[type="hidden"]') if hidden.has_attr("name") } - # get nodedesc and nodeid information from main page location select box + + # Get NodeDesc and NodeID information based on Odyssey version if odyssey_version < 2017: - location_option = main_soup.findAll("option")[0] - logger.info(f"location: {location_option.text}") - hidden_values.update( - {"NodeDesc": location_option.text, "NodeID": location_option["value"]} - ) + location_option = main_soup.find_all("option")[0] + logger.info(f"Location: {location_option.text}") + hidden_values.update({ + "NodeDesc": location_option.text, + "NodeID": location_option["value"] + }) else: - hidden_values["SearchCriteria.SelectedCourt"] = hidden_values[ - "Settings.DefaultLocation" - ] # TODO: Search in default court. Might need to add further logic later to loop through courts. + hidden_values["SearchCriteria.SelectedCourt"] = hidden_values.get("Settings.DefaultLocation", "") + return hidden_values - def get_search_results(self, session, search_url, logger, ms_wait, hidden_values, case_number): - # POST a request for search results + def get_search_results( + self, + session: requests.sessions.Session, + search_url: str, + logger: logging.Logger, + ms_wait: int, + hidden_values: Dict[str, str], + case_number: Optional[str] + ) -> BeautifulSoup: + """ + Retrieves search results from the search page. + + :param session: The session object for making HTTP requests. + :param search_url: The URL to request search results from. + :param logger: Logger instance for logging information. + :param ms_wait: Milliseconds to wait before making requests. + :param hidden_values: Dictionary of hidden input values. + :param case_number: Case number for searching. + :returns: Parsed BeautifulSoup object of the search results page HTML. + """ + results_page_html = request_page_with_retry( session=session, url=search_url, @@ -213,156 +420,213 @@ def get_search_results(self, session, search_url, logger, ms_wait, hidden_values data=create_single_case_search_form_data(hidden_values, case_number), ms_wait=ms_wait, ) - results_soup = BeautifulSoup(results_page_html, "html.parser") - return results_soup + return BeautifulSoup(results_page_html, "html.parser") + + def scrape_individual_case( + self, + base_url: str, + search_url: str, + hidden_values: Dict[str, str], + case_number: Optional[str], + case_html_path: str, + session: requests.sessions.Session, + logger: logging.Logger, + ms_wait: int + ) -> None: - def scrape_individual_case(self, base_url, search_url, hidden_values, case_number, case_html_path, session, logger, ms_wait): # Individual case search logic results_soup = self.get_search_results(session, search_url, logger, ms_wait, hidden_values, case_number) case_urls = [ base_url + anchor["href"] for anchor in results_soup.select('a[href^="CaseDetail"]') ] + logger.info(f"{len(case_urls)} entries found") - case_id = case_urls[0].split("=")[1] - logger.info(f"{case_id} - scraping case") - # make request for the case - case_html = request_page_with_retry( - session=session, - url=case_urls[0], - verification_text="Date Filed", - logger=logger, - ms_wait=ms_wait, - ) - # write html case data - logger.info(f"{len(case_html)} response string length") + + if case_urls: + case_id = case_urls[0].split("=")[1] + logger.info(f"{case_id} - scraping case") + + case_html = request_page_with_retry( + session=session, + url=case_urls[0], + verification_text="Date Filed", + logger=logger, + ms_wait=ms_wait, + ) + + logger.info(f"{len(case_html)} response string length") - with open( - os.path.join(case_html_path, f"{case_id}.html"), "w" - ) as file_handle: - file_handle.write(case_html) + with open( + os.path.join(case_html_path, f"{case_id}.html"), "w" + ) as file_handle: + file_handle.write(case_html) + else: + logger.warning("No case URLs found.") - def scrape_jo_list(self, odyssey_version, search_soup, judicial_officers, logger): - # get a list of JOs to their IDs from the search page + def scrape_jo_list( + self, + odyssey_version: int, + search_soup: BeautifulSoup, + judicial_officers: Optional[List[str]], + logger: logging.Logger + ) -> Tuple[List[str], Dict[str, str]]: + """ + Scrapes a list of judicial officers and their IDs from the search page. + + Optionally receives a list of judicial officers to scrape. + + :param odyssey_version: The version of Odyssey to determine the selector. + :param search_soup: Parsed BeautifulSoup object of the search page HTML. + :param judicial_officers: List of specific judicial officers to use. + :param logger: Logger instance for logging information. + :returns: Tuple containing a list of judicial officers to use and a dictionary of judicial officers and their IDs. + """ + + selector = 'select[labelname="Judicial Officer:"] > option' if odyssey_version < 2017 else 'select[id="selHSJudicialOfficer"] > option' judicial_officer_to_ID = { option.text: option["value"] - for option in search_soup.select( - 'select[labelname="Judicial Officer:"] > option' - if odyssey_version < 2017 - else 'select[id="selHSJudicialOfficer"] > option' - ) + for option in search_soup.select(selector) if option.text } - # if juidicial_officers param is not specified, use all of them + if not judicial_officers: judicial_officers = list(judicial_officer_to_ID.keys()) + logger.info(f"No judicial officers specified, so scraping all of them: {len(judicial_officers)}") + else: + logger.info(f"Judicial officers were specified, so only scraping these: {judicial_officers}") + return judicial_officers, judicial_officer_to_ID - def scrape_results_page(self, odyssey_version, base_url, search_url, hidden_values, JO_id, date_string, session, logger, ms_wait): - # POST a request for search results - results_page_html = request_page_with_retry( - session=session, - url=search_url + def scrape_results_page( + self, + odyssey_version: int, + base_url: str, + search_url: str, + hidden_values: dict[str, str], + jo_id: str, + date_string: str, + session: requests.sessions.Session, + logger: logging.Logger, + ms_wait: int + ) -> Tuple[str, BeautifulSoup]: + """ + Scrapes the results page based on Odyssey version and search criteria. + + :param odyssey_version: The version of Odyssey to determine the URL and verification text. + :param base_url: The base URL for constructing full URLs. + :param search_url: The URL to request search results from. + :param hidden_values: Dictionary of hidden input values. + :param jo_id: Judicial officer ID for searching. + :param date_string: Date string for searching. + :param session: The session object for making HTTP requests. + :param logger: Logger instance for logging information. + :param ms_wait: Milliseconds to wait before making requests. + :returns: A tuple containing the HTML of the results page and the parsed BeautifulSoup object. + """ + + search_url = ( + search_url if odyssey_version < 2017 - else urllib.parse.urljoin(base_url, "Hearing/SearchHearings/HearingSearch"), - verification_text="Record Count" + else urllib.parse.urljoin(base_url, "Hearing/SearchHearings/HearingSearch") + ) + + verification_text = ( + "Record Count" if odyssey_version < 2017 - else "Search Results", + else "Search Results" + ) + + results_page_html = request_page_with_retry( + session=session, + url=search_url, + verification_text=verification_text, logger=logger, - data=create_search_form_data( - date_string, JO_id, hidden_values, odyssey_version - ), + data=create_search_form_data(date_string, jo_id, hidden_values, odyssey_version), ms_wait=ms_wait, - ) + ) + results_soup = BeautifulSoup(results_page_html, "html.parser") + return results_page_html, results_soup - # Not currently in use. Should be moved to a county-specific module, class, and method when a post2017 county is included - """def scrape_case_data_post2017(self, base_url, case_html_path, session, logger, ms_wait): - # Need to POST this page to get a JSON of the search results after the initial POST - case_list_json = request_page_with_retry( - session=session, - url=urllib.parse.urljoin(base_url, "Hearing/HearingResults/Read"), - verification_text="AggregateResults", - logger=logger, - ) - case_list_json = json.loads(case_list_json) - logger.info(f"{case_list_json['Total']} cases found") - for case_json in case_list_json["Data"]: - case_id = str(case_json["CaseId"]) - logger.info(f"{case_id} scraping case") - # make request for the case - case_html = request_page_with_retry( - session=session, - url=urllib.parse.urljoin(base_url, "Case/CaseDetail"), - verification_text="Case Information", - logger=logger, - ms_wait=ms_wait, - params={ - "eid": case_json["EncryptedCaseId"], - "CaseNumber": case_json["CaseNumber"], - }, - ) - # make request for financial info - case_html += request_page_with_retry( - session=session, - url=urllib.parse.urljoin( - base_url, "Case/CaseDetail/LoadFinancialInformation" - ), - verification_text="Financial", - logger=logger, - ms_wait=ms_wait, - params={ - "caseId": case_json["CaseId"], - }, - ) - # write case html data - logger.info(f"{len(case_html)} response string length") - with open( - os.path.join(case_html_path, f"{case_id}.html"), "w" - ) as file_handle: - file_handle.write(case_html)""" - - def scrape_multiple_cases(self, county, odyssey_version, base_url, search_url, hidden_values, judicial_officers, judicial_officer_to_ID, case_html_path, logger, session, ms_wait, start_date, end_date): + def scrape_multiple_cases( + self, + county: str, + odyssey_version: int, + base_url: str, + search_url: str, + hidden_values: Dict[str, str], + judicial_officers: List[str], + judicial_officer_to_ID: Dict[str, str], + case_html_path: Optional[str], + logger: logging.Logger, + session: requests.Session, + ms_wait: int, + start_date: str, + end_date: str + ) -> None: start_date = datetime.strptime(start_date, '%Y-%m-%d').date() end_date = datetime.strptime(end_date, '%Y-%m-%d').date() - # loop through each day - for date in ( - start_date + timedelta(n) - for n in range((end_date - start_date).days + 1) - ): - date_string = datetime.strftime(date, "%m/%d/%Y") - # loop through each judicial officer + + for date in (start_date + timedelta(n) for n in range((end_date - start_date).days + 1)): + date_string = date.strftime("%m/%d/%Y") + for JO_name in judicial_officers: if JO_name not in judicial_officer_to_ID: - logger.error(f"judicial officer {JO_name} not found on search page. Continuing.") + logger.error(f"Judicial officer {JO_name} not found on search page. Continuing.") continue - JO_id = judicial_officer_to_ID[JO_name] + + jo_id = judicial_officer_to_ID[JO_name] logger.info(f"Searching cases on {date_string} for {JO_name}") - # scrapes the results page with the search parameters and returns the soup. it also returns the html but it's not used at this time - results_html, results_soup = self.scrape_results_page(odyssey_version, base_url, search_url, hidden_values, JO_id, date_string, session, logger, ms_wait) - # get a different scraper for each county - self.get_class_and_method(county) - # gets the county-specific scraper class and method - scraper_instance, scraper_function = self.get_class_and_method(county=county) - if scraper_instance is not None and scraper_function is not None: - scraper_function(base_url, results_soup, case_html_path, logger, session, ms_wait) - else: - print("Error: Could not obtain parser instance or function.") - - def scrape(self, county, judicial_officers, ms_wait, start_date, end_date, court_calendar_link_text, case_number, case_html_path): - ms_wait, start_date, end_date, court_calendar_link_text, case_number = self.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number) + + results_soup = self.scrape_results_page( + odyssey_version, base_url, search_url, hidden_values, jo_id, date_string, session, logger, ms_wait + ) + + scraper_function = self.get_class_and_method(county, logger) + scraper_function(base_url, results_soup, case_html_path, logger, session, ms_wait) + + def scrape( + self, + county: str, + judicial_officers: List[str], + ms_wait: int, + start_date: str, + end_date: str, + court_calendar_link_text: Optional[str], + case_number: Optional[str], + case_html_path: Optional[str] + ) -> None: + ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path = self.set_defaults( + ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path + ) + logger = self.configure_logger() - county = self.format_county(county) - session = self.create_session() - self.make_directories(county) if not case_html_path else case_html_path + county = self.format_county(county, logger) + session = self.create_session(logger) + + if case_html_path is None: + self.make_directories(county, logger) + base_url, odyssey_version, notes = self.get_ody_link(county, logger) main_page_html, main_soup = self.scrape_main_page(base_url, odyssey_version, session, notes, logger, ms_wait) - search_url, search_page_html, search_soup = self.scrape_search_page(base_url, odyssey_version, main_page_html, main_soup, session, logger, ms_wait, court_calendar_link_text) + search_url, search_page_html, search_soup = self.scrape_search_page( + base_url, odyssey_version, main_page_html, main_soup, session, logger, ms_wait, court_calendar_link_text + ) + hidden_values = self.get_hidden_values(odyssey_version, main_soup, search_soup, logger) - if case_number: # just scrapes the one case - self.scrape_individual_case(base_url, search_url, hidden_values, case_number, case_html_path, session, logger, ms_wait) - else: # scrape a list of JOs between a start and end date - judicial_officers, judicial_officer_to_ID = self.scrape_jo_list(odyssey_version, search_soup, judicial_officers, logger) - SCRAPER_START_TIME = time() - self.scrape_multiple_cases(odyssey_version, base_url, search_url, hidden_values, judicial_officers, judicial_officer_to_ID, case_html_path, logger, session, ms_wait, start_date, end_date) - logger.info(f"\nTime to run script: {round(time() - SCRAPER_START_TIME, 2)} seconds") + + if case_number: + self.scrape_individual_case( + base_url, search_url, hidden_values, case_number, case_html_path, session, logger, ms_wait + ) + else: + judicial_officers, judicial_officer_to_ID = self.scrape_jo_list( + odyssey_version, search_soup, judicial_officers, logger + ) + scraper_start_time = time() + self.scrape_multiple_cases( + county, odyssey_version, base_url, search_url, hidden_values, judicial_officers, judicial_officer_to_ID, + case_html_path, logger, session, ms_wait, start_date, end_date + ) + logger.info(f"\nTime to run script: {round(time() - scraper_start_time, 2)} seconds") diff --git a/src/scraper/helpers.py b/src/scraper/helpers.py index f3d48e8..8b29c36 100644 --- a/src/scraper/helpers.py +++ b/src/scraper/helpers.py @@ -5,7 +5,6 @@ from logging import Logger from typing import Dict, Optional, Tuple, Literal from enum import Enum -from datetime import datetime, timezone, timedelta #This is called debug and quit. def write_debug_and_quit( diff --git a/src/scraper/scrapcode_post2017.py b/src/scraper/scrapcode_post2017.py new file mode 100644 index 0000000..713d097 --- /dev/null +++ b/src/scraper/scrapcode_post2017.py @@ -0,0 +1,45 @@ +# Not currently in use. Should be moved to a county-specific module, class, and method when a post2017 county is included +"""def scrape_case_data_post2017(self, base_url, case_html_path, session, logger, ms_wait): + # Need to POST this page to get a JSON of the search results after the initial POST + case_list_json = request_page_with_retry( + session=session, + url=urllib.parse.urljoin(base_url, "Hearing/HearingResults/Read"), + verification_text="AggregateResults", + logger=logger, + ) + case_list_json = json.loads(case_list_json) + logger.info(f"{case_list_json['Total']} cases found") + for case_json in case_list_json["Data"]: + case_id = str(case_json["CaseId"]) + logger.info(f"{case_id} scraping case") + # make request for the case + case_html = request_page_with_retry( + session=session, + url=urllib.parse.urljoin(base_url, "Case/CaseDetail"), + verification_text="Case Information", + logger=logger, + ms_wait=ms_wait, + params={ + "eid": case_json["EncryptedCaseId"], + "CaseNumber": case_json["CaseNumber"], + }, + ) + # make request for financial info + case_html += request_page_with_retry( + session=session, + url=urllib.parse.urljoin( + base_url, "Case/CaseDetail/LoadFinancialInformation" + ), + verification_text="Financial", + logger=logger, + ms_wait=ms_wait, + params={ + "caseId": case_json["CaseId"], + }, + ) + # write case html data + logger.info(f"{len(case_html)} response string length") + with open( + os.path.join(case_html_path, f"{case_id}.html"), "w" + ) as file_handle: + file_handle.write(case_html)""" \ No newline at end of file diff --git a/src/tester/test_unittest.py b/src/tester/test_unittest.py index b096b73..9d4fa52 100644 --- a/src/tester/test_unittest.py +++ b/src/tester/test_unittest.py @@ -1,5 +1,5 @@ -import unittest, sys, os, json, warnings, requests, logging -from datetime import datetime, timezone, timedelta +import unittest, sys, os, json, warnings, logging +from datetime import datetime, timedelta from bs4 import BeautifulSoup current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -29,7 +29,7 @@ def test_scrape_get_ody_link(self, scraper_instance = Scraper() logger = scraper_instance.configure_logger() county = scraper_instance.format_county(county) - base_url = scraper_instance.get_ody_link('hays', logger) + base_url = scraper_instance.get_ody_link(county, logger) self.assertIsNotNone(base_url, "No URL found for this county.") def test_scrape_main_page(self, @@ -40,12 +40,15 @@ def test_scrape_main_page(self, start_date = None, end_date = None, court_calendar_link_text = None, - case_number = None + case_number = None, + ssl = True, + case_html_path = None, + county = 'hays' ): scraper_instance = Scraper() logger = scraper_instance.configure_logger() - ms_wait, start_date, end_date, court_calendar_link_text, case_number = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number) - session = scraper_instance.create_session() + ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path) + session = scraper_instance.create_session(logger, ssl) main_page_html, main_soup = scraper_instance.scrape_main_page(base_url, odyssey_version, session, notes, logger, ms_wait) self.assertIsNotNone(main_page_html, "No main page HTML came through. main_page_html = None.") self.assertTrue('ssSearchHyperlink' in main_page_html, "There is no 'ssSearchHyperlink' text found in this main page html.") # Note: This validation is already being done using the 'verification_text' field. @@ -62,7 +65,11 @@ def test_scrape_search_page(self, court_calendar_link_text = None, start_date = None, end_date = None, - case_number = None): + case_number = None, + ssl = True, + case_html_path = None, + county = 'hays' + ): # Open the mocked main page HTML with open( os.path.join(os.path.dirname(__file__), "..", "..", "resources", 'test_files','hays_main_page.html'), "r", encoding='utf-8' @@ -73,8 +80,8 @@ def test_scrape_search_page(self, # Look for the court calendar link scraper_instance = Scraper() logger = scraper_instance.configure_logger() - ms_wait, start_date, end_date, court_calendar_link_text, case_number = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number) - session = scraper_instance.create_session() + ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path) + session = scraper_instance.create_session(logger, ssl) search_url, search_page_html, search_soup = scraper_instance.scrape_search_page(base_url, odyssey_version, main_page_html, main_soup, session, logger, ms_wait, court_calendar_link_text) # Verify the court calendar link self.assertIsNotNone(main_page_html, "No search url came through. search_url = None.") @@ -93,7 +100,11 @@ def test_get_hidden_values(self, court_calendar_link_text = None, start_date = None, end_date = None, - case_number = None): + case_number = None, + ssl = True, + case_html_path = None, + county = 'hays' + ): # Open the mocked main page HTML with open( os.path.join(os.path.dirname(__file__), "..", "..", "resources", 'test_files','hays_main_page.html'), "r", encoding='utf-8' @@ -113,7 +124,7 @@ def test_get_hidden_values(self, #Run the function scraper_instance = Scraper() logger = scraper_instance.configure_logger() - ms_wait, start_date, end_date, court_calendar_link_text, case_number = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number) + ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path) hidden_values = scraper_instance.get_hidden_values(odyssey_version, main_soup, search_soup, logger) self.assertIsNotNone(hidden_values, "No hidden values came through. hidden_values = None.") self.assertTrue(type(hidden_values) == dict, "The hidden values fields is not a dictionary but it needs to be.") @@ -130,7 +141,8 @@ def test_scrape_individual_case(self, start_date = None, end_date = None, court_calendar_link_text = None, - case_html_path = os.path.join(os.path.dirname(__file__), "..", "..", "resources", 'test_files', 'test_data', 'hays', "case_html") + case_html_path = os.path.join(os.path.dirname(__file__), "..", "..", "resources", 'test_files', 'test_data', 'hays', "case_html"), + ssl = True, ): # This starts a timer to compare the run start time to the last updated time of the resulting HTML to ensure the HTML was created after run start time now = datetime.now() @@ -140,10 +152,10 @@ def test_scrape_individual_case(self, # Call the functions being tested. In this case, the functions being called are all of the subfunctions required and effectively replicates the shape of scrape. scraper_instance = Scraper() - ms_wait, start_date, end_date, court_calendar_link_text, case_number = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number) + ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path) logger = scraper_instance.configure_logger() county = scraper_instance.format_county(county) - session = scraper_instance.create_session() + session = scraper_instance.create_session(logger, ssl) case_html_path = scraper_instance.make_directories(county) if not case_html_path else case_html_path base_url, odyssey_version, notes = scraper_instance.get_ody_link(county, logger) main_page_html, main_soup = scraper_instance.scrape_main_page(base_url, odyssey_version, session, notes, logger, ms_wait) @@ -190,13 +202,15 @@ def test_scrape_jo_list(self, county = 'hays', session = None, logger = None, + ssl = True, + case_html_path = None, ): # This test requires that certain dependency functions run first. scraper_instance = Scraper() - ms_wait, start_date, end_date, court_calendar_link_text, case_number = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number) + ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path) logger = scraper_instance.configure_logger() county = scraper_instance.format_county(county) - session = scraper_instance.create_session() + session = scraper_instance.create_session(logger, ssl) main_page_html, main_soup = scraper_instance.scrape_main_page(base_url, odyssey_version, session, notes, logger, ms_wait) search_url, search_page_html, search_soup = scraper_instance.scrape_search_page(base_url, odyssey_version, main_page_html, main_soup, session, logger, ms_wait, court_calendar_link_text) judicial_officers, judicial_officer_to_ID = scraper_instance.scrape_jo_list(odyssey_version, search_soup, judicial_officers, logger) @@ -219,7 +233,9 @@ def test_scrape_results_page(self, start_date = None, end_date = None, court_calendar_link_text = None, - case_number = None + case_number = None, + ssl = True, + case_html_path = None, ): # Read in the test 'hidden values' that are necessary for searching a case @@ -230,10 +246,10 @@ def test_scrape_results_page(self, hidden_values = hidden_values.replace("'", "\"") hidden_values = json.loads(hidden_values) scraper_instance = Scraper() - ms_wait, start_date, end_date, court_calendar_link_text, case_number = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number) + ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path) logger = scraper_instance.configure_logger() county = scraper_instance.format_county(county) - session = scraper_instance.create_session() + session = scraper_instance.create_session(logger, ssl) # Open the example main page HTML with open( os.path.join(os.path.dirname(__file__), "..", "..", "resources", 'test_files','hays_main_page.html'), "r", encoding='utf-8' @@ -255,23 +271,24 @@ def test_scrape_results_page(self, #def scrape_case_data_post2017() @unittest.skipIf(SKIP_SLOW, "slow") - def test_scrape_multiple_cases(self, - county = 'hays', - odyssey_version = 2003, - base_url = r'http://public.co.hays.tx.us/', - search_url = r'https://public.co.hays.tx.us/Search.aspx?ID=900', - hidden_values = None, - judicial_officers = ['Boyer, Bruce'], - judicial_officer_to_ID = {'Boyer, Bruce':'39607'}, - JO_id = '39607', - date_string = '07-01-2024', - court_calendar_link_text = None, - case_number = None, - ms_wait = 200, - start_date = '2024-07-01', - end_date = '2024-07-01', - case_html_path = os.path.join(os.path.dirname(__file__), "..", "..", "resources", 'test_files', 'test_data', 'hays', "case_html") - ): + def test_scrape_multiple_cases(self, + county = 'hays', + odyssey_version = 2003, + base_url = r'http://public.co.hays.tx.us/', + search_url = r'https://public.co.hays.tx.us/Search.aspx?ID=900', + hidden_values = None, + judicial_officers = ['Boyer, Bruce'], + judicial_officer_to_ID = {'Boyer, Bruce':'39607'}, + JO_id = '39607', + date_string = '07-01-2024', + court_calendar_link_text = None, + case_number = None, + ms_wait = 200, + start_date = '2024-07-01', + end_date = '2024-07-01', + case_html_path = os.path.join(os.path.dirname(__file__), "..", "..", "resources", 'test_files', 'test_data', 'hays', "case_html"), + ssl = True, + ): # This starts a timer to compare the run start time to the last updated time of the resulting HTML to ensure the HTML was created after run start time now = datetime.now() @@ -296,9 +313,9 @@ def test_scrape_multiple_cases(self, # There are some live depency functions that have to be run before the primary code can be run. scraper_instance = Scraper() - session = scraper_instance.create_session() - ms_wait, start_date, end_date, court_calendar_link_text, case_number = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number) + ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path = scraper_instance.set_defaults(ms_wait, start_date, end_date, court_calendar_link_text, case_number, ssl, county, case_html_path) logger = scraper_instance.configure_logger() + session = scraper_instance.create_session(logger, ssl) case_html_path = scraper_instance.make_directories(county) if not case_html_path else case_html_path search_url, search_page_html, search_soup = scraper_instance.scrape_search_page(base_url, odyssey_version, main_page_html, main_soup, session, logger, ms_wait, court_calendar_link_text) results_html, results_soup = scraper_instance.scrape_results_page(odyssey_version, base_url, search_url, hidden_values, JO_id, date_string, session, logger, ms_wait)