From c4f36e44389c62e1f1d28bf5b08940753a7701f6 Mon Sep 17 00:00:00 2001 From: Jason Lin Date: Sun, 22 Sep 2024 11:15:48 -0500 Subject: [PATCH 1/5] changes so far needs fixing --- src/cleaner/__init__.py | 189 +++++++++++++++++++++++-------------- src/cleaner/run_cleaner.py | 6 ++ 2 files changed, 125 insertions(+), 70 deletions(-) create mode 100644 src/cleaner/run_cleaner.py diff --git a/src/cleaner/__init__.py b/src/cleaner/__init__.py index 07060d1..3c1b1cf 100644 --- a/src/cleaner/__init__.py +++ b/src/cleaner/__init__.py @@ -5,30 +5,117 @@ class Cleaner: - def __init__(self, county): - self.county = county.lower() + # commented out for now and maybe remove as it may not be needed anymore? + # def __init__(self, county): + # self.county = county.lower() - def add_parsing_date(self, input_dict: dict, out_file: dict) -> dict: + @staticmethod + def add_parsing_date(input_dict: dict, out_file: dict) -> dict: # removed self from args and added @staticmethod # This will add the date of parsing to the final cleaned json file today_date = dt.datetime.today().strftime('%Y-%m-%d') out_file['parsing_date'] = today_date return out_file - - def clean(self): + + @staticmethod + def ensure_folder_exists(folder_path): # removed self from args and added @staticmethod + # Checks if the output folder exists + if not os.path.exists(folder_path): + # Create the folder if it doesn't exist + os.makedirs(folder_path) + print(f"Folder '{folder_path}' created successfully.") + else: + print(f"Folder '{folder_path}' already exists.") + + @staticmethod + def load_json_file(file_path): # removed self from args and added @staticmethod + """Loads a JSON file from a given file path and returns the data as an object""" + with open(file_path, "r") as f: + return json.load(f) + + @staticmethod + def map_charge_names(charge_data): # removed self from args and added @staticmethod + """Creates a dictionary mapping charge names to their corresponding UMich data""" + charge_mapping = {} + for item in charge_data: + charge_mapping[item['charge_name']] = item + return charge_mapping + + @staticmethod + def process_charges(charges, charge_mapping): # removed self from args and added @staticmethod + """ + Processes a list of charges by formatting charge details, + mapping charges to UMich data, and finding the earliest charge date. + + Args: + charges (list): A list of charges where each charge is a dictionary containing charge details. + charge_mapping (dict): A dictionary mapping charge names to corresponding UMich data. + + Returns: + tuple: A list of processed charges and the earliest charge date. + """ + charge_dates = [] + processed_charges = [] + + for i, charge in enumerate(charges): + charge_dict = { + "charge_id": i, + "charge_level": charge["level"], + "orignal_charge": charge["charges"], + "statute": charge["statute"], + "is_primary_charge": i == 0, + } + + # Parse the charge date and append it to charge_dates + charge_datetime = dt.datetime.strptime(charge["date"], "%m/%d/%Y") + charge_dates.append(charge_datetime) + charge_dict["charge_date"] = dt.datetime.strftime(charge_datetime, "%Y-%m-%d") + + # Try to map the charge to UMich data + try: + charge_dict.update(charge_mapping[charge["charges"]]) + except KeyError: + print(f"Couldn't find this charge: {charge['charges']}") + pass + + processed_charges.append(charge_dict) + + # Find the earliest charge date + earliest_charge_date = dt.datetime.strftime(min(charge_dates), "%Y-%m-%d") + + return processed_charges, earliest_charge_date + + @staticmethod + def contains_good_motion(motion, event): # removed self from args and added @staticmethod + """Recursively check if a motion exists in an event list or sublist.""" + if isinstance(event, list): + return any(Cleaner.contains_good_motion(motion, item) for item in event) #changed self.contains_good_motion to Cleaner.contains_good_motion + return motion.lower() in event.lower() + + @staticmethod + def find_good_motions(events, good_motions): # removed self from args and added @staticmethod + """Finds motions in events based on list of good motions.""" + motions_in_events = [ + motion for motion in good_motions if Cleaner.contains_good_motion(motion, events) #changed self.contains_good_motion to Cleaner.contains_good_motion + ] + return motions_in_events + + @staticmethod + def write_json_output(file_path, data): # removed self from args and added @staticmethod + """Writes the given data to a JSON file at the specified file path.""" + with open(file_path, "w") as f: + json.dump(data, f) + + @staticmethod # removed self from args and added @staticmethod + def clean(county): # added county as argument to not rely on self.county case_json_folder_path = os.path.join( - os.path.dirname(__file__), "..", "..", "data", self.county, "case_json" + os.path.dirname(__file__), "..", "..", "data", county.lower(), "case_json" #changed self.county to county.lower() ) case_json_cleaned_folder_path = os.path.join( - os.path.dirname(__file__), "..", "..", "data", self.county, "case_json_cleaned" + os.path.dirname(__file__), "..", "..", "data", county.lower(), "case_json_cleaned" #changed self.county to county.lower() ) - # Checks if the output folder exists - if not os.path.exists(case_json_cleaned_folder_path): - # Create the folder if it doesn't exist - os.makedirs(case_json_cleaned_folder_path) - print(f"Folder '{case_json_cleaned_folder_path}' created successfully.") - else: - print(f"Folder '{case_json_cleaned_folder_path}' already exists.") + # Checks that the folder exists, if not it will be created + Cleaner.ensure_folder_exists(case_json_cleaned_folder_path) # Call method using Cleaner class name instead of self list_case_json_files = os.listdir(case_json_folder_path) for case_json in list_case_json_files: @@ -45,27 +132,22 @@ def clean(self): ] # Original Format - in_file = case_json_folder_path + "\\" + case_json - with open(in_file, "r") as f: - input_dict = json.load(f) + # in_file = case_json_folder_path + "\\" + case_json + in_file = os.path.join(case_json_folder_path, case_json) # Replaced original concatenation with this. Should help prevent problems with different OS path separators + + input_dict = Cleaner.load_json_file(in_file) # Call method using Cleaner class name instead of self #(f"input_dict: {input_dict}") # Get mappings of charge names to umich decsriptions charge_name_to_umich_file = os.path.join( - os.path.dirname(__file__),"..", "..", "resources", "umich-uccs-database.json" + os.path.dirname(__file__),"..", "..", "resources", "umich-uccs-database.json" ) - with open(charge_name_to_umich_file, "r") as f: - charge_name_to_umich = json.load(f) - #print(f"input_dict: {charge_name_to_umich}") + # ADD VARIABLE FOR Cleaner.load_json_file(charge_name_to_umich_file - charge_name_to_umich_dict = {} - for item in charge_name_to_umich: - # Assuming each item is a dictionary with 'charges' as a key - charge_name = item['charge_name'] - charge_name_to_umich_dict[charge_name] = item + charge_name_to_umich = Cleaner.map_charge_names(Cleaner.load_json_file(charge_name_to_umich_file)) # Call method using Cleaner class name instead of self + #print(f"input_dict: {charge_name_to_umich}") - charge_name_to_umich = charge_name_to_umich_dict # Cleaned Case Primary format out_file = {} out_file["case_number"] = input_dict["code"] #Note: This may be closed to personally identifying information of the defendant. @@ -75,58 +157,25 @@ def clean(self): out_file["html_hash"] = input_dict["html_hash"] # Create charges list - charge_dates = [] - out_file["charges"] = [] - for i, charge in enumerate(input_dict["charge information"]): - charge_dict = { - "charge_id": i, - "charge_level": charge["level"], - "orignal_charge": charge["charges"], - "statute": charge["statute"], - "is_primary_charge": i == 0, # True if this is the first charge - } - charge_datetime = dt.datetime.strptime(charge["date"], "%m/%d/%Y") - charge_dates.append(charge_datetime) - charge_dict["charge_date"] = dt.datetime.strftime(charge_datetime, "%Y-%m-%d") - # Umichigan mapping - try: - charge_dict.update(charge_name_to_umich[charge["charges"]]) - except KeyError as KeyErrorCharge: - print(f"Couldn't find this charge: {KeyErrorCharge}") - pass - - out_file["charges"].append(charge_dict) - out_file["earliest_charge_date"] = dt.datetime.strftime(min(charge_dates), "%Y-%m-%d") - - def contains_good_motion(motion, event): - """Recursively check if a motion exists in an event list or sublist.""" - if isinstance(event, list): - return any(contains_good_motion(motion, item) for item in event) - return motion.lower() in event.lower() - - # Iterate through every event and see if one of our "good motions" is in it - motions_in_events = [ - motion - for motion in good_motions - if contains_good_motion(motion, input_dict["other events and hearings"]) - ] - out_file["motions"] = motions_in_events - out_file["has_evidence_of_representation"] = len(motions_in_events) > 0 + out_file["charges"], out_file["earliest_charge_date"] = Cleaner.process_charges(input_dict["charge information"], charge_name_to_umich) # Call method using Cleaner class name instead of self + + # Stores list of motions from good_motions that exist inside input_dict["other events and hearings"] + out_file["motions"] = Cleaner.find_good_motions(input_dict["other events and hearings"], good_motions) # Call method using Cleaner class name instead of self + # Sets boolean based on whether any good motions were found + out_file["has_evidence_of_representation"] = len(out_file["motions"]) > 0 # This adds a hash of the unique string per defense attorney that matches this format: 'defense attorney name:defense atttorney phone number'. # This will conceal the defense attorney but keep a unique idenfier to link defense attorney between cases. def_atty_unique_str = input_dict["party information"]["defense attorney"] + ':' + input_dict["party information"]["defense attorney phone number"] def_atty_hash = xxhash.xxh64(str(def_atty_unique_str)).hexdigest() - out_file["defense attorney"] = def_atty_hash + out_file["defense_attorney"] = def_atty_hash # added underscore in defense_attorney for consistency # This adds the date of parsing to the final cleaned json - out_file = self.add_parsing_date(input_dict, out_file) + out_file = Cleaner.add_parsing_date(input_dict, out_file) # Call method using Cleaner class name instead of self # Original Format out_filepath = os.path.join( - os.path.dirname(__file__), "..", "..", "data", self.county, "case_json_cleaned",case_json + os.path.dirname(__file__), "..", "..", "data", county.lower(), "case_json_cleaned",case_json # changed self.county to county.lower() ) - with open(out_filepath, "w") as f: - json.dump(out_file, f) - + Cleaner.write_json_output(out_filepath, out_file) # Call method using Cleaner class name instead of self diff --git a/src/cleaner/run_cleaner.py b/src/cleaner/run_cleaner.py new file mode 100644 index 0000000..5b9f64b --- /dev/null +++ b/src/cleaner/run_cleaner.py @@ -0,0 +1,6 @@ +# Test script for testing cleaner +from cleaner import Cleaner + +clean_instance = Cleaner() +county = 'hays' +clean_instance.clean(county) From 9cd5ce598430446d90878c5f4534a53c2669bb41 Mon Sep 17 00:00:00 2001 From: Jason Lin Date: Mon, 23 Sep 2024 16:34:02 -0500 Subject: [PATCH 2/5] broke down cleaner method further to individual methods. changed variable names for more clarity --- src/cleaner/__init__.py | 195 ++++++++++++++++++++++------------------ 1 file changed, 110 insertions(+), 85 deletions(-) diff --git a/src/cleaner/__init__.py b/src/cleaner/__init__.py index 3c1b1cf..b5a8d61 100644 --- a/src/cleaner/__init__.py +++ b/src/cleaner/__init__.py @@ -9,31 +9,51 @@ class Cleaner: # def __init__(self, county): # self.county = county.lower() + GOOD_MOTIONS = [ + "Motion To Suppress", + "Motion to Reduce Bond", + "Motion to Reduce Bond Hearing", + "Motion for Production", + "Motion For Speedy Trial", + "Motion for Discovery", + "Motion In Limine", + ] + @staticmethod - def add_parsing_date(input_dict: dict, out_file: dict) -> dict: # removed self from args and added @staticmethod + def add_parsing_date(input_dict: dict, output_json_data: dict) -> dict: # This will add the date of parsing to the final cleaned json file today_date = dt.datetime.today().strftime('%Y-%m-%d') - out_file['parsing_date'] = today_date - return out_file - + output_json_data['parsing_date'] = today_date + return output_json_data + @staticmethod - def ensure_folder_exists(folder_path): # removed self from args and added @staticmethod - # Checks if the output folder exists + def get_folder_path(county, folder_type): + """Returns the path for the specified folder type ('case_json' or 'case_json_cleaned').""" + return os.path.join(os.path.dirname(__file__), "..", "..", "data", county.lower(), folder_type) + + @staticmethod + def ensure_folder_exists(folder_path): + """Checks if the output folder exists and creates it if it doesn't""" if not os.path.exists(folder_path): - # Create the folder if it doesn't exist os.makedirs(folder_path) print(f"Folder '{folder_path}' created successfully.") else: print(f"Folder '{folder_path}' already exists.") @staticmethod - def load_json_file(file_path): # removed self from args and added @staticmethod + def get_and_ensure_folder(county, folder_type): + """Gets the folder path and ensures that the folder exists as this folder will contain the cleaned json files.""" + folder_path = Cleaner.get_folder_path(county, folder_type) + Cleaner.ensure_folder_exists(folder_path) + + @staticmethod + def load_json_file(file_path): """Loads a JSON file from a given file path and returns the data as an object""" with open(file_path, "r") as f: return json.load(f) @staticmethod - def map_charge_names(charge_data): # removed self from args and added @staticmethod + def map_charge_names(charge_data): """Creates a dictionary mapping charge names to their corresponding UMich data""" charge_mapping = {} for item in charge_data: @@ -41,7 +61,14 @@ def map_charge_names(charge_data): # removed self from args and added @staticmet return charge_mapping @staticmethod - def process_charges(charges, charge_mapping): # removed self from args and added @staticmethod + def load_and_map_charge_names(file_path): + """Loads a JSON file and maps charge names to their corresponding UMich data.""" + charge_data = Cleaner.load_json_file(file_path) + return Cleaner.map_charge_names(charge_data) + + + @staticmethod + def process_charges(charges, charge_mapping): """ Processes a list of charges by formatting charge details, mapping charges to UMich data, and finding the earliest charge date. @@ -83,99 +110,97 @@ def process_charges(charges, charge_mapping): # removed self from args and added earliest_charge_date = dt.datetime.strftime(min(charge_dates), "%Y-%m-%d") return processed_charges, earliest_charge_date - + @staticmethod - def contains_good_motion(motion, event): # removed self from args and added @staticmethod + def contains_good_motion(motion, event): """Recursively check if a motion exists in an event list or sublist.""" if isinstance(event, list): - return any(Cleaner.contains_good_motion(motion, item) for item in event) #changed self.contains_good_motion to Cleaner.contains_good_motion + return any(Cleaner.contains_good_motion(motion, item) for item in event) return motion.lower() in event.lower() @staticmethod - def find_good_motions(events, good_motions): # removed self from args and added @staticmethod + def find_good_motions(events, good_motions): """Finds motions in events based on list of good motions.""" motions_in_events = [ - motion for motion in good_motions if Cleaner.contains_good_motion(motion, events) #changed self.contains_good_motion to Cleaner.contains_good_motion + motion for motion in good_motions if Cleaner.contains_good_motion(motion, events) ] return motions_in_events @staticmethod - def write_json_output(file_path, data): # removed self from args and added @staticmethod + def hash_defense_attorney(input_dict): + """Hashes the defense attorney info to anonymize it.""" + def_atty_unique_str = f'{input_dict["party information"]["defense attorney"]}:{input_dict["party information"]["defense attorney phone number"]}' + return xxhash.xxh64(def_atty_unique_str).hexdigest() + + + @staticmethod + def write_json_output(file_path, data): """Writes the given data to a JSON file at the specified file path.""" with open(file_path, "w") as f: json.dump(data, f) - @staticmethod # removed self from args and added @staticmethod - def clean(county): # added county as argument to not rely on self.county + @staticmethod + def process_single_case(county, case_json_folder_path, case_json_filename): + """Process a single case JSON file.""" + input_json_path = os.path.join(case_json_folder_path, case_json_filename) + input_dict = Cleaner.load_json_file(input_json_path) + + # Initialize cleaned output data + output_json_data = { + "case_number": input_dict["code"], + "attorney_type": input_dict["party information"]["appointed or retained"], + "county": input_dict["county"], + "html_hash": input_dict["html_hash"], + "charges": [], + "earliest_charge_date": "", + "motions": [], + "has_evidence_of_representation": False, + "defense_attorney": Cleaner.hash_defense_attorney(input_dict) + } + + # Load charge mappings + charge_name_to_umich_file = os.path.join( + os.path.dirname(__file__), "..", "..", "resources", "umich-uccs-database.json" + ) + charges_mapped = Cleaner.load_and_map_charge_names(charge_name_to_umich_file) - case_json_folder_path = os.path.join( - os.path.dirname(__file__), "..", "..", "data", county.lower(), "case_json" #changed self.county to county.lower() + # Process charges and motions + output_json_data["charges"], output_json_data["earliest_charge_date"] = Cleaner.process_charges( + input_dict["charge information"], charges_mapped + ) + output_json_data["motions"] = Cleaner.find_good_motions( + input_dict["other events and hearings"], Cleaner.GOOD_MOTIONS ) - case_json_cleaned_folder_path = os.path.join( - os.path.dirname(__file__), "..", "..", "data", county.lower(), "case_json_cleaned" #changed self.county to county.lower() + output_json_data["has_evidence_of_representation"] = len(output_json_data["motions"]) > 0 + + # Add parsing date + output_json_data = Cleaner.add_parsing_date(input_dict, output_json_data) + + # Write output to file + output_filepath = os.path.join( + os.path.dirname(__file__), "..", "..", "data", county.lower(), "case_json_cleaned", case_json_filename ) - # Checks that the folder exists, if not it will be created - Cleaner.ensure_folder_exists(case_json_cleaned_folder_path) # Call method using Cleaner class name instead of self + Cleaner.write_json_output(output_filepath, output_json_data) + @staticmethod + def process_json_files(county, case_json_folder_path): + """Processes all JSON files in the specified folder.""" list_case_json_files = os.listdir(case_json_folder_path) - for case_json in list_case_json_files: - print(case_json) - # List of motions identified as evidenciary - good_motions = [ - "Motion To Suppress", - "Motion to Reduce Bond", - "Motion to Reduce Bond Hearing", - "Motion for Production", - "Motion For Speedy Trial", - "Motion for Discovery", - "Motion In Limine", - ] - - # Original Format - # in_file = case_json_folder_path + "\\" + case_json - in_file = os.path.join(case_json_folder_path, case_json) # Replaced original concatenation with this. Should help prevent problems with different OS path separators - - input_dict = Cleaner.load_json_file(in_file) # Call method using Cleaner class name instead of self - #(f"input_dict: {input_dict}") - - # Get mappings of charge names to umich decsriptions - charge_name_to_umich_file = os.path.join( - os.path.dirname(__file__),"..", "..", "resources", "umich-uccs-database.json" - ) - - # ADD VARIABLE FOR Cleaner.load_json_file(charge_name_to_umich_file - - charge_name_to_umich = Cleaner.map_charge_names(Cleaner.load_json_file(charge_name_to_umich_file)) # Call method using Cleaner class name instead of self - #print(f"input_dict: {charge_name_to_umich}") - - # Cleaned Case Primary format - out_file = {} - out_file["case_number"] = input_dict["code"] #Note: This may be closed to personally identifying information of the defendant. - out_file["attorney_type"] = input_dict["party information"]["appointed or retained"] - #Adding the county and hash values into the final version. - out_file["county"] = input_dict["county"] - out_file["html_hash"] = input_dict["html_hash"] - - # Create charges list - out_file["charges"], out_file["earliest_charge_date"] = Cleaner.process_charges(input_dict["charge information"], charge_name_to_umich) # Call method using Cleaner class name instead of self - - # Stores list of motions from good_motions that exist inside input_dict["other events and hearings"] - out_file["motions"] = Cleaner.find_good_motions(input_dict["other events and hearings"], good_motions) # Call method using Cleaner class name instead of self - # Sets boolean based on whether any good motions were found - out_file["has_evidence_of_representation"] = len(out_file["motions"]) > 0 - - # This adds a hash of the unique string per defense attorney that matches this format: 'defense attorney name:defense atttorney phone number'. - # This will conceal the defense attorney but keep a unique idenfier to link defense attorney between cases. - def_atty_unique_str = input_dict["party information"]["defense attorney"] + ':' + input_dict["party information"]["defense attorney phone number"] - def_atty_hash = xxhash.xxh64(str(def_atty_unique_str)).hexdigest() - out_file["defense_attorney"] = def_atty_hash # added underscore in defense_attorney for consistency - - # This adds the date of parsing to the final cleaned json - out_file = Cleaner.add_parsing_date(input_dict, out_file) # Call method using Cleaner class name instead of self - - # Original Format - out_filepath = os.path.join( - os.path.dirname(__file__), "..", "..", "data", county.lower(), "case_json_cleaned",case_json # changed self.county to county.lower() - ) - - Cleaner.write_json_output(out_filepath, out_file) # Call method using Cleaner class name instead of self + for case_json_filename in list_case_json_files: + Cleaner.process_single_case(county, case_json_folder_path, case_json_filename) + + @staticmethod + def clean(county): + """ + Cleans and processes case data for a given county. + This method performs the following steps: + 1. Loads raw JSON case data from the 'case_json' folder for the specified county. + 2. Processes and maps charges using an external UMich data source. + 3. Identifies relevant motions from a predefined list of good motions. + 4. Hashes defense attorney information to anonymize but uniquely identify the attorney. + 5. Adds metadata, such as parsing date and case number, to the cleaned data. + 6. Writes the cleaned data to the 'case_json_cleaned' folder for the specified county. + """ + case_json_folder_path = Cleaner.get_folder_path(county, "case_json") + Cleaner.get_and_ensure_folder(county, "case_json_cleaned") + Cleaner.process_json_files(county, case_json_folder_path) From d3af4af6143a653114a46debdced22814a7e2765 Mon Sep 17 00:00:00 2001 From: Jason Lin Date: Thu, 26 Sep 2024 17:32:22 -0500 Subject: [PATCH 3/5] changed back to instance methods & additional updates --- src/cleaner/__init__.py | 215 ++++++++++++++++++++++------------------ 1 file changed, 119 insertions(+), 96 deletions(-) diff --git a/src/cleaner/__init__.py b/src/cleaner/__init__.py index b5a8d61..267d2a5 100644 --- a/src/cleaner/__init__.py +++ b/src/cleaner/__init__.py @@ -2,12 +2,12 @@ import os import datetime as dt import xxhash +import logging -class Cleaner: +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - # commented out for now and maybe remove as it may not be needed anymore? - # def __init__(self, county): - # self.county = county.lower() +class Cleaner: GOOD_MOTIONS = [ "Motion To Suppress", @@ -19,63 +19,64 @@ class Cleaner: "Motion In Limine", ] - @staticmethod - def add_parsing_date(input_dict: dict, output_json_data: dict) -> dict: - # This will add the date of parsing to the final cleaned json file - today_date = dt.datetime.today().strftime('%Y-%m-%d') - output_json_data['parsing_date'] = today_date - return output_json_data - - @staticmethod - def get_folder_path(county, folder_type): - """Returns the path for the specified folder type ('case_json' or 'case_json_cleaned').""" - return os.path.join(os.path.dirname(__file__), "..", "..", "data", county.lower(), folder_type) - - @staticmethod - def ensure_folder_exists(folder_path): - """Checks if the output folder exists and creates it if it doesn't""" - if not os.path.exists(folder_path): - os.makedirs(folder_path) - print(f"Folder '{folder_path}' created successfully.") - else: - print(f"Folder '{folder_path}' already exists.") - - @staticmethod - def get_and_ensure_folder(county, folder_type): - """Gets the folder path and ensures that the folder exists as this folder will contain the cleaned json files.""" - folder_path = Cleaner.get_folder_path(county, folder_type) - Cleaner.ensure_folder_exists(folder_path) - - @staticmethod - def load_json_file(file_path): + def __init__(self): + pass + + def add_parsing_date(self, output_json_data: dict) -> dict: + """Adds the date of parsing to the final cleaned json file""" + output_json_data['parsing_date'] = dt.datetime.today().strftime('%Y-%m-%d') + return output_json_data + + def get_or_create_folder_path(self, county: str, folder_type: str) -> str: + """Returns and ensures the existence of the folder path.""" + folder_path = os.path.join(os.path.dirname(__file__), "..", "..", "data", county.lower(), folder_type) + try: + if not os.path.exists(folder_path): + os.makedirs(folder_path) + logging.info(f"Folder '{folder_path}' created successfully.") + else: + logging.info(f"Folder '{folder_path}' already exists.") + except OSError as e: + logging.error(f"Error creating folder '{folder_path}': {e}") + return folder_path + + def load_json_file(self, file_path: str) -> dict: """Loads a JSON file from a given file path and returns the data as an object""" - with open(file_path, "r") as f: - return json.load(f) - - @staticmethod - def map_charge_names(charge_data): + try: + with open(file_path, "r") as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + logging.error(f"Error loading file at {file_path}: {e}") + return {} + + def map_charge_names(self, charge_data: list[dict]) -> dict: """Creates a dictionary mapping charge names to their corresponding UMich data""" - charge_mapping = {} - for item in charge_data: - charge_mapping[item['charge_name']] = item - return charge_mapping + return {item['charge_name']: item for item in charge_data} - @staticmethod - def load_and_map_charge_names(file_path): + def load_and_map_charge_names(self, file_path: str) -> dict: """Loads a JSON file and maps charge names to their corresponding UMich data.""" - charge_data = Cleaner.load_json_file(file_path) - return Cleaner.map_charge_names(charge_data) + charge_data = self.load_json_file(file_path) + # Check if the file loaded successfully + if not charge_data: + logging.error(f"Failed to load charge data from {file_path}") + raise FileNotFoundError(f"File not found or is empty: {file_path}") + # Create charge mapping from data + try: + charge_mapping = self.map_charge_names(charge_data) + except KeyError as e: + logging.error(f"Error in mapping charge names: {e}") + raise ValueError(f"Invalid data structure: {file_path}") + return charge_mapping - @staticmethod - def process_charges(charges, charge_mapping): + def process_charges(self, charges: list[dict], charge_mapping: dict) -> tuple[list[dict], str]: """ Processes a list of charges by formatting charge details, mapping charges to UMich data, and finding the earliest charge date. Args: - charges (list): A list of charges where each charge is a dictionary containing charge details. - charge_mapping (dict): A dictionary mapping charge names to corresponding UMich data. + charges: A list of charges where each charge is a dictionary containing charge details. + charge_mapping: A dictionary mapping charge names to corresponding UMich data. Returns: tuple: A list of processed charges and the earliest charge date. @@ -93,57 +94,69 @@ def process_charges(charges, charge_mapping): } # Parse the charge date and append it to charge_dates - charge_datetime = dt.datetime.strptime(charge["date"], "%m/%d/%Y") - charge_dates.append(charge_datetime) - charge_dict["charge_date"] = dt.datetime.strftime(charge_datetime, "%Y-%m-%d") + try: + charge_datetime = dt.datetime.strptime(charge["date"], "%m/%d/%Y") + charge_dates.append(charge_datetime) + charge_dict["charge_date"] = dt.datetime.strftime(charge_datetime, "%Y-%m-%d") + except ValueError: + logging.error(f"Error parsing date for charge: {charge}") + continue # Try to map the charge to UMich data try: charge_dict.update(charge_mapping[charge["charges"]]) except KeyError: - print(f"Couldn't find this charge: {charge['charges']}") - pass + logging.warning(f"Couldn't find this charge: {charge['charges']}") + continue processed_charges.append(charge_dict) # Find the earliest charge date - earliest_charge_date = dt.datetime.strftime(min(charge_dates), "%Y-%m-%d") + if charge_dates: + earliest_charge_date = dt.datetime.strftime(min(charge_dates), "%Y-%m-%d") + else: + logging.warning("No valid charge dates found.") + earliest_charge_date = "" return processed_charges, earliest_charge_date - @staticmethod - def contains_good_motion(motion, event): + def contains_good_motion(self, motion: str, event: list | str) -> bool: """Recursively check if a motion exists in an event list or sublist.""" if isinstance(event, list): - return any(Cleaner.contains_good_motion(motion, item) for item in event) + return any(self.contains_good_motion(motion, item) for item in event) return motion.lower() in event.lower() - @staticmethod - def find_good_motions(events, good_motions): + def find_good_motions(self, events: list | str, good_motions: list[str]) -> list[str]: """Finds motions in events based on list of good motions.""" - motions_in_events = [ - motion for motion in good_motions if Cleaner.contains_good_motion(motion, events) - ] - return motions_in_events + return [motion for motion in good_motions if self.contains_good_motion(motion, events)] - @staticmethod - def hash_defense_attorney(input_dict): + def hash_defense_attorney(self, input_dict: dict) -> str: """Hashes the defense attorney info to anonymize it.""" - def_atty_unique_str = f'{input_dict["party information"]["defense attorney"]}:{input_dict["party information"]["defense attorney phone number"]}' - return xxhash.xxh64(def_atty_unique_str).hexdigest() + try: + def_atty_unique_str = f'{input_dict["party information"]["defense attorney"]}:{input_dict["party information"]["defense attorney phone number"]}' + return xxhash.xxh64(def_atty_unique_str).hexdigest() + except KeyError as e: + logging.error(f"Missing defense attorney data: {e}") + return "" - @staticmethod - def write_json_output(file_path, data): + def write_json_output(self, file_path: str, data: dict) -> None: """Writes the given data to a JSON file at the specified file path.""" - with open(file_path, "w") as f: - json.dump(data, f) - - @staticmethod - def process_single_case(county, case_json_folder_path, case_json_filename): + try: + with open(file_path, "w") as f: + json.dump(data, f) + logging.info(f"Successfully wrote cleaned data to {file_path}") + except OSError as e: + logging.error(f"Failed to write JSON output to {file_path}: {e}") + + def process_single_case(self, county: str, case_json_folder_path: str, case_json_filename:str) -> None: """Process a single case JSON file.""" input_json_path = os.path.join(case_json_folder_path, case_json_filename) - input_dict = Cleaner.load_json_file(input_json_path) + input_dict = self.load_json_file(input_json_path) + + if not input_dict: + logging.error(f"Failed to load case data from {input_json_path}") + return # Initialize cleaned output data output_json_data = { @@ -155,42 +168,48 @@ def process_single_case(county, case_json_folder_path, case_json_filename): "earliest_charge_date": "", "motions": [], "has_evidence_of_representation": False, - "defense_attorney": Cleaner.hash_defense_attorney(input_dict) + "defense_attorney": self.hash_defense_attorney(input_dict) } # Load charge mappings charge_name_to_umich_file = os.path.join( os.path.dirname(__file__), "..", "..", "resources", "umich-uccs-database.json" ) - charges_mapped = Cleaner.load_and_map_charge_names(charge_name_to_umich_file) + charges_mapped = self.load_and_map_charge_names(charge_name_to_umich_file) # Process charges and motions - output_json_data["charges"], output_json_data["earliest_charge_date"] = Cleaner.process_charges( + output_json_data["charges"], output_json_data["earliest_charge_date"] = self.process_charges( input_dict["charge information"], charges_mapped ) - output_json_data["motions"] = Cleaner.find_good_motions( - input_dict["other events and hearings"], Cleaner.GOOD_MOTIONS + output_json_data["motions"] = self.find_good_motions( + input_dict["other events and hearings"], self.GOOD_MOTIONS ) output_json_data["has_evidence_of_representation"] = len(output_json_data["motions"]) > 0 # Add parsing date - output_json_data = Cleaner.add_parsing_date(input_dict, output_json_data) + output_json_data = self.add_parsing_date(output_json_data) + + # Ensure the case_json_cleaned folder exists + cleaned_folder_path = self.get_or_create_folder_path(county, "case_json_cleaned") # Write output to file - output_filepath = os.path.join( - os.path.dirname(__file__), "..", "..", "data", county.lower(), "case_json_cleaned", case_json_filename - ) - Cleaner.write_json_output(output_filepath, output_json_data) + output_filepath = os.path.join(cleaned_folder_path, case_json_filename) + self.write_json_output(output_filepath, output_json_data) - @staticmethod - def process_json_files(county, case_json_folder_path): + def process_json_files(self, county: str, case_json_folder_path: str) -> None: """Processes all JSON files in the specified folder.""" - list_case_json_files = os.listdir(case_json_folder_path) + try: + list_case_json_files = os.listdir(case_json_folder_path) + except (FileNotFoundError, Exception) as e: + logging.error(f"Error reading directory {case_json_folder_path}: {e}") + return for case_json_filename in list_case_json_files: - Cleaner.process_single_case(county, case_json_folder_path, case_json_filename) + try: + self.process_single_case(county, case_json_folder_path, case_json_filename) + except Exception as e: + logging.error(f"Error processing file {case_json_filename}. Error: {e}") - @staticmethod - def clean(county): + def clean(self, county: str) -> None: """ Cleans and processes case data for a given county. This method performs the following steps: @@ -201,6 +220,10 @@ def clean(county): 5. Adds metadata, such as parsing date and case number, to the cleaned data. 6. Writes the cleaned data to the 'case_json_cleaned' folder for the specified county. """ - case_json_folder_path = Cleaner.get_folder_path(county, "case_json") - Cleaner.get_and_ensure_folder(county, "case_json_cleaned") - Cleaner.process_json_files(county, case_json_folder_path) + try: + case_json_folder_path = self.get_or_create_folder_path(county, "case_json") + logging.info(f"Processing data for county: {county}") + self.process_json_files(county, case_json_folder_path) + logging.info(f"Completed processing for county: {county}") + except Exception as e: + logging.error(f"Error during cleaning process for county: {county}. Error: {e}") From c7b121dfc856d2d8858360592df50d311becf671 Mon Sep 17 00:00:00 2001 From: Jason Lin Date: Tue, 1 Oct 2024 14:49:25 -0500 Subject: [PATCH 4/5] merged map_charge_names and load_and_map_charge_names, merged add_parsing_date directly into process_single_case, moved cleaned_folder_path step to process_json_files to stop redundancy in the loop --- src/cleaner/__init__.py | 32 ++++++++++---------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/src/cleaner/__init__.py b/src/cleaner/__init__.py index 267d2a5..3ce8569 100644 --- a/src/cleaner/__init__.py +++ b/src/cleaner/__init__.py @@ -22,11 +22,6 @@ class Cleaner: def __init__(self): pass - def add_parsing_date(self, output_json_data: dict) -> dict: - """Adds the date of parsing to the final cleaned json file""" - output_json_data['parsing_date'] = dt.datetime.today().strftime('%Y-%m-%d') - return output_json_data - def get_or_create_folder_path(self, county: str, folder_type: str) -> str: """Returns and ensures the existence of the folder path.""" folder_path = os.path.join(os.path.dirname(__file__), "..", "..", "data", county.lower(), folder_type) @@ -49,10 +44,6 @@ def load_json_file(self, file_path: str) -> dict: logging.error(f"Error loading file at {file_path}: {e}") return {} - def map_charge_names(self, charge_data: list[dict]) -> dict: - """Creates a dictionary mapping charge names to their corresponding UMich data""" - return {item['charge_name']: item for item in charge_data} - def load_and_map_charge_names(self, file_path: str) -> dict: """Loads a JSON file and maps charge names to their corresponding UMich data.""" charge_data = self.load_json_file(file_path) @@ -60,14 +51,12 @@ def load_and_map_charge_names(self, file_path: str) -> dict: if not charge_data: logging.error(f"Failed to load charge data from {file_path}") raise FileNotFoundError(f"File not found or is empty: {file_path}") - # Create charge mapping from data + # Create dictionary mapping charge names try: - charge_mapping = self.map_charge_names(charge_data) + return {item['charge_name']: item for item in charge_data} except KeyError as e: logging.error(f"Error in mapping charge names: {e}") raise ValueError(f"Invalid data structure: {file_path}") - return charge_mapping - def process_charges(self, charges: list[dict], charge_mapping: dict) -> tuple[list[dict], str]: """ @@ -149,7 +138,7 @@ def write_json_output(self, file_path: str, data: dict) -> None: except OSError as e: logging.error(f"Failed to write JSON output to {file_path}: {e}") - def process_single_case(self, county: str, case_json_folder_path: str, case_json_filename:str) -> None: + def process_single_case(self, case_json_folder_path: str, case_json_filename:str, cleaned_folder_path: str) -> None: """Process a single case JSON file.""" input_json_path = os.path.join(case_json_folder_path, case_json_filename) input_dict = self.load_json_file(input_json_path) @@ -168,7 +157,8 @@ def process_single_case(self, county: str, case_json_folder_path: str, case_json "earliest_charge_date": "", "motions": [], "has_evidence_of_representation": False, - "defense_attorney": self.hash_defense_attorney(input_dict) + "defense_attorney": self.hash_defense_attorney(input_dict), + "parsing_date": dt.datetime.today().strftime('%Y-%m-%d') } # Load charge mappings @@ -186,12 +176,6 @@ def process_single_case(self, county: str, case_json_folder_path: str, case_json ) output_json_data["has_evidence_of_representation"] = len(output_json_data["motions"]) > 0 - # Add parsing date - output_json_data = self.add_parsing_date(output_json_data) - - # Ensure the case_json_cleaned folder exists - cleaned_folder_path = self.get_or_create_folder_path(county, "case_json_cleaned") - # Write output to file output_filepath = os.path.join(cleaned_folder_path, case_json_filename) self.write_json_output(output_filepath, output_json_data) @@ -203,9 +187,13 @@ def process_json_files(self, county: str, case_json_folder_path: str) -> None: except (FileNotFoundError, Exception) as e: logging.error(f"Error reading directory {case_json_folder_path}: {e}") return + + # Ensure the case_json_cleaned folder exists + cleaned_folder_path = self.get_or_create_folder_path(county, "case_json_cleaned") + for case_json_filename in list_case_json_files: try: - self.process_single_case(county, case_json_folder_path, case_json_filename) + self.process_single_case(case_json_folder_path, case_json_filename, cleaned_folder_path) except Exception as e: logging.error(f"Error processing file {case_json_filename}. Error: {e}") From 13464ab30176efbe57d95e41278806fdaac6b238 Mon Sep 17 00:00:00 2001 From: Jason Lin <137189766+jrex003@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:35:10 -0500 Subject: [PATCH 5/5] Delete src/cleaner/run_cleaner.py --- src/cleaner/run_cleaner.py | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 src/cleaner/run_cleaner.py diff --git a/src/cleaner/run_cleaner.py b/src/cleaner/run_cleaner.py deleted file mode 100644 index 5b9f64b..0000000 --- a/src/cleaner/run_cleaner.py +++ /dev/null @@ -1,6 +0,0 @@ -# Test script for testing cleaner -from cleaner import Cleaner - -clean_instance = Cleaner() -county = 'hays' -clean_instance.clean(county)