From 18fdd4aa4eacf0ebc7981323b2db4a22d7942bb4 Mon Sep 17 00:00:00 2001 From: Conner Vieira Date: Thu, 5 Sep 2024 12:40:02 -0400 Subject: [PATCH] Added background ALPR to dash-cam mode --- CHANGELOG.md | 1 + alpr.py | 118 +++++++++++++++++++++++++++++++++++++++++++ dashcam.py | 126 +++++++++++++++++++++++++++++++++++++++++++++- docs/CONFIGURE.md | 2 +- main.py | 19 +++---- utils.py | 102 ------------------------------------- 6 files changed, 252 insertions(+), 116 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46109cb..6c8fed5 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -495,3 +495,4 @@ This update focuses on unifying Predator's different modes to allow multipurpose - Remote alert database sources can now be cached. - This allows Predator to continue using entries from a remote alert database even when the source goes offline. - Migrated most of the ALPR processing to a dedicated file for sake of organization. +- The "detected plate" notification sound now plays regardless of the console output level. diff --git a/alpr.py b/alpr.py index a4c00f2..a5725a8 100755 --- a/alpr.py +++ b/alpr.py @@ -17,6 +17,7 @@ import json # Required to process JSON data + predator_root_directory = str(os.path.dirname(os.path.realpath(__file__))) # This variable determines the folder path of the root Predator directory. This should usually automatically recognize itself, but if it doesn't, you can change it manually. @@ -31,6 +32,16 @@ exit() +try: + if (config["developer"]["offline"] == False): # Only import networking libraries if offline mode is turned off. + if (config["general"]["status_lighting"]["enabled"] == True or config["realtime"]["push_notifications"]["enabled"] == True or len(config["general"]["alerts"]["databases"]) > 0): + import requests # Required to make network requests + import validators # Required to validate URLs +except: + print("Failed to determine if network features are enabled in the configuration.") +if (len(config["general"]["alerts"]["databases"]) > 0): + import hashlib + import utils style = utils.style # Load style information from the utils script. @@ -110,6 +121,28 @@ def alpr_get_queued_plates(): # This function is used to fetch the latest queue +# This function validates a license plate given a template. +def validate_plate(plate, template): + plate_valid = True # By default, the plate is valid, until we find a character that doesn't align. + + if (len(template) == len(plate)): # Make sure the template and plate are the same length. If so, continue with validation. Otherwise, automatically invalidate the plate, and skip the rest of the validation process. + for x in range(len(template)): + if (template[x].isalpha() == plate[x].isalpha() or template[x].isnumeric() == plate[x].isnumeric()): # If this character is alphabetical in both the template and plate, or if this character is numeric in both the template and plate, then this character is valid. + # This character is valid, so don't change anything. + pass + else: + # This character doesn't match between the template and plate, so mark the plate as invalid. + plate_valid = False + break # Exit the loop now, since we already know the plate is invalid. + else: + plate_valid = False + + return plate_valid # Return the results of the plate validation + + + + + # This function loads the license plate log, and initializes the file if necessary. def load_alpr_log(): global config @@ -150,3 +183,88 @@ def run_alpr(image_filepath): reading_output = {} return reading_output + + + + + +# This function is used to display a list of provided license plate alerts. +def display_alerts(active_alerts): + for alert in active_alerts: # Iterate through each active alert. + # Display an alert that is starkly different from the rest of the console output. + print(style.yellow + style.bold) + print("===================") + print("ALERT HIT - " + str(alert)) + if ("rule" in active_alerts[alert]): # Check to see if a rule exists for this alert plate. This should always be the case, but it's worth checking for sake of stability. + print("Rule: " + str(active_alerts[alert]["rule"])) # Display the rule that triggered this alert. + if ("name" in active_alerts[alert]): # Check to see if a name exists for this alert plate. + print("Name: " + str(active_alerts[alert]["name"])) # Display this alert plate's name. + if ("description" in active_alerts[alert]): # Check to see if a name exists for this alert plate. + print("Description: " + str(active_alerts[alert]["description"])) # Display this alert plate's description. + print("===================") + print(style.end + style.end) + + + + +# The following functions are responsible for loading alert database. +def load_alert_database_remote(source, cache_directory): + debug_message("Loading remote database source.") + if (config["realtime"]["saving"]["remote_alert_sources"] == True): + source_hash = hashlib.md5(source.encode()).hexdigest() + if (config["developer"]["offline"] == False): # Check to see if offline mode is disabled. + try: + raw_download_data = requests.get(source, timeout=6).text # Save the raw text data from the URL to a variable. + except: + raw_download_data = "{}" + display_message("The license plate alert database from " + source + " could not be loaded.", 2) + if (config["realtime"]["saving"]["remote_alert_sources"] == True): + if (os.path.exists(cache_directory + "/" + source_hash + ".json")): # Check to see if the cached file exists. + debug_message("Attempting to load locally cached data for this remote source.") + return load_alert_database_local(cache_directory + "/" + source_hash + ".json") # Load the locally cached file. + processed_download_data = str(raw_download_data) # Convert the downloaded data to a string. + try: + alert_database = json.loads(processed_download_data) # Load the alert database as JSON data. + if (config["realtime"]["saving"]["remote_alert_sources"] == True): + if (os.path.isdir(cache_directory) == False): + os.system("mkdir -p '" + str(cache_directory) + "'") + save_to_file(cache_directory + "/" + source_hash + ".json", json.dumps(alert_database)) + except: + alert_database = {} + display_message("The license plate alert database returned by the remote source " + source + " doesn't appear to be compatible JSON data. This source has not been loaded.", 2) + else: # Predator is in offline mode, but a remote alert database source was specified. + alert_database = {} # Set the alert database to an empty dictionary. + display_message("A remote alert database source " + source + " was specified, but Predator is in offline mode. This source has not been loaded.", 2) + return alert_database + +def load_alert_database_local(source): + debug_message("Loading local database source.") + if (os.path.exists(source)): # Check to see if the database specified by the user actually exists. + f = open(source, "r") # Open the user-specified database file. + file_contents = f.read() # Read the file. + if (file_contents[0] == "{"): # Check to see if the first character in the file indicates that this alert database is a JSON database. + alert_database = json.loads(file_contents) # Load the alert database as JSON data. + else: + alert_database = {} + display_message("The alert database specified at " + source + " does appear to contain compatible JSON data. This source has not been loaded.", 3) + f.close() # Close the file. + else: # If the alert database specified by the user does not exist, alert the user of the error. + alert_database = {} + display_message("The alert database specified at " + source + " does not exist. This source has not been loaded.", 3) + + return alert_database + +def load_alert_database(sources, project_directory): # This function compiles the provided list of sources into a single complete alert dictionary. + cache_directory = project_directory + "/" + config["realtime"]["saving"]["remote_alert_sources"]["directory"] + debug_message("Loading license plate alert list") + complete_alert_database = {} # Set the complete alert database to a placeholder dictionary. + for source in sources: # Iterate through each source in the list of sources. + if (validators.url(source)): # Check to see if the user supplied a URL as their alert database. + alert_database = load_alert_database_remote(source, cache_directory) + else: # The input the user supplied doesn't appear to be a URL, so assume it is a file. + alert_database = load_alert_database_local(project_directory + "/" + source) + + for rule in alert_database: # Iterate over each rule in this database. + complete_alert_database[rule] = alert_database[rule] # Add this rule to the complete alert database. + + return complete_alert_database diff --git a/dashcam.py b/dashcam.py index 0da97b6..f81f735 100755 --- a/dashcam.py +++ b/dashcam.py @@ -13,6 +13,7 @@ import os # Required to interact with certain operating system functions import json # Required to process JSON data +import fnmatch # Required to use wildcards to check strings. predator_root_directory = str(os.path.dirname(os.path.realpath(__file__))) # This variable determines the folder path of the root Predator directory. This should usually automatically recognize itself, but it if it doesn't, you can change it manually. @@ -35,6 +36,7 @@ heartbeat = utils.heartbeat update_state = utils.update_state convert_speed = utils.convert_speed +save_to_file = utils.save_to_file import threading import time @@ -804,16 +806,136 @@ def capture_dashcam_video(directory, device="main", width=1280, height=720): # This function runs in a seperate thread from the main dashcam capture, and will intermittently grab the most recent frame, and run ALPR on it. +if (config["realtime"]["saving"]["license_plates"]["enabled"] == True): # Check to see if the license plate logging file name is not empty. If the file name is empty, then license plate logging will be disabled. + plate_log = alpr.load_alpr_log() def background_alpr(device): global current_frame_data + if (config["realtime"]["saving"]["license_plates"]["enabled"] == True): # Check to see if the license plate logging file name is not empty. If the file name is empty, then license plate logging will be disabled. + global plate_log + while dashcam_recording_active: # Run until dashcam capture finishes. if (device in current_frame_data): + debug_message("Running ALPR") temporary_image_filepath = config["general"]["interface_directory"] + "/DashcamALPR_" + str(device) + ".jpg" # Determine where this frame will be temporarily saved for processing. cv2.imwrite(temporary_image_filepath, current_frame_data[device]) # Write this frame to the interface directory. alpr_results = alpr.run_alpr(temporary_image_filepath) # Run ALPR on the frame. + + debug_message("Validating plates") + detected_plates_valid = [] # This will hold all of the plates that pass the validation sequence. + detected_plates_all = [] # This will hold all plates detected, regardless of validation. if (len(alpr_results["results"]) > 0): # Check to see if at least one plate was detected. - print(alpr_results) # TODO: Handle ALPR results. - time.sleep(float(config["dashcam"]["alpr"]["interval"])) + for result in alpr_results["results"]: + guesses_valid = {} # This is a temporary dictionary that will hold the valid guesses before they are added to the complete list of detected plates. + guesses_all = {} # This is a temporary dictionary that will hold all guesses before they are added to the complete list of detected plates. + for candidate in result["candidates"]: + if (candidate["confidence"] >= float(config["general"]["alpr"]["validation"]["confidence"])): # Check to make sure this plate exceeds the minimum confidence threshold. + if any(alpr.validate_plate(candidate["plate"], format_template) for format_template in config["general"]["alpr"]["validation"]["license_plate_format"]) or len(config["general"]["alpr"]["validation"]["license_plate_format"]) == 0: # Check to see if this plate passes validation. + guesses_valid[candidate["plate"]] = candidate["confidence"] # Add this plate to the list of valid guesses. + guesses_all[candidate["plate"]] = candidate["confidence"] # Add this plate to the list of valid guesses. + + if (len(guesses_valid) == 0): # If there are no valid guesses, then check to see if "best_effort" mode is enabled. + if (config["general"]["alpr"]["validation"]["best_effort"] == True): # Check to see if "best_effort" is enabled. + guesses_valid[result["candidates"][0]["plate"]] = result["candidates"][0]["confidence"] # Add the most likely plate to the valid guesses. + + if (len(guesses_valid) > 0): # Check to see if there is at least one valid guess. + detected_plates_valid.append(guesses_valid) # Add the valid guesses as a new plate. + if (len(guesses_all) > 0): # Check to see if there is at least one guess. + detected_plates_all.append(guesses_all) # Add the guesses as a new plate. + del guesses_valid + del guesses_all + + + debug_message("Checking for alerts") + if (config["general"]["alerts"]["alerts_ignore_validation"]): + plates_to_check_alerts = detected_plates_all + else: + plates_to_check_alerts = detected_plates_valid + alert_database = alpr.load_alert_database(config["general"]["alerts"]["databases"], config["general"]["working_directory"]) # Load the license plate alert database. + active_alerts = {} # This is an empty placeholder that will hold all of the active alerts. + if (len(alert_database) > 0): # Only run alert processing if the alert database isn't empty. + for rule in alert_database: # Run through every plate in the alert plate database supplied by the user. + for plate in plates_to_check_alerts: # Iterate through each of the plates detected this round, regardless of whether or not they were validated. + for guess in plate: # Run through each of the plate guesses generated by ALPR, regardless of whether or not they are valid according to the plate formatting guideline. + if (fnmatch.fnmatch(guess, rule)): # Check to see this detected plate guess matches this particular plate in the alert database, taking wildcards into account. + active_alerts[guess] = {} + active_alerts[guess]["rule"] = rule # Add this plate to the active alerts dictionary with the rule that triggered it. + if ("name" in alert_database[rule]): + active_alerts[guess]["name"] = alert_database[rule]["name"] + if ("description" in alert_database[rule]): + active_alerts[guess]["description"] = alert_database[rule]["description"] + if (config["general"]["alerts"]["allow_duplicate_alerts"] == False): + break # Break the loop if an alert is found for this guess, in order to avoid triggering multiple alerts for each guess of the same plate. + + + + # Save detected license plates to file. + if (config["realtime"]["saving"]["license_plates"]["enabled"] == True): # Check to see if license plate history saving is enabled. + debug_message("Saving license plate history") + + if (len(detected_plates_all) > 0): # Only save the license plate history for this round if 1 or more plates were detected. + current_time = time.time() # Get the current timestamp. + + plate_log[current_time] = {} # Initialize an entry in the plate history log using the current time. + + if (config["realtime"]["gps"]["alpr_location_tagging"] == True): # Check to see if the configuration value for geotagging license plate detections has been enabled. + if (config["general"]["gps"]["enabled"] == True): # Check to see if GPS functionality is enabled. + current_location = get_gps_location() # Get the current location. + else: + current_location = [0.0, 0.0] # Grab a placeholder for the current location, since GPS functionality is disabled. + + plate_log[current_time]["location"] = {"lat": current_location[0],"lon": current_location[1]} # Add the current location to the plate history log entry. + + plate_log[current_time]["plates"] = {} + + for plate in detected_plates_all: # Iterate though each plate detected this round. + top_plate = list(plate.keys())[0] + if (config["realtime"]["saving"]["license_plates"]["save_guesses"] == True): # Only initialize the plate's guesses to the log if Predator is configured to do so. + plate_log[current_time]["plates"][top_plate] = {"alerts": [], "guesses": {}} # Initialize this plate in the plate log. + else: + plate_log[current_time]["plates"][top_plate] = {"alerts": []} # Initialize this plate in the plate log. + for guess in plate: # Iterate through each guess in this plate. + if (guess in active_alerts): # Check to see if this guess matches one of the active alerts. + plate_log[current_time]["plates"][top_plate]["alerts"].append(active_alerts[guess]) # Add the rule that triggered the alert to a separate list. + if (config["realtime"]["saving"]["license_plates"]["save_guesses"] == True): # Only add this guess to the log if Predator is configured to do so. + plate_log[current_time]["plates"][top_plate]["guesses"][guess] = plate[guess] # Add this guess to the log, with its confidence level. + + save_to_file(config["general"]["working_directory"] + "/" + config["realtime"]["saving"]["license_plates"]["file"], json.dumps(plate_log)) # Save the modified plate log to the disk as JSON data. + + + + # Issue interface directory updates. + if (config["general"]["interface_directory"] != ""): # Check to see if the interface directory is enabled. + debug_message("Issuing interface updates") + heartbeat() # Issue a status heartbeat. + + # Reformat the plates to the format expected by the interface directory. + plates_to_save_to_interface = {} + for plate in detected_plates_valid: + top_plate = list(plate.keys())[0] + plates_to_save_to_interface[top_plate] = {} + for guess in plate: + plates_to_save_to_interface[top_plate][guess] = plate[guess] + + utils.log_plates(plates_to_save_to_interface) # Update the list of recently detected license plates. + utils.log_alerts(active_alerts) # Update the list of active alerts. + + + # Display alerts. + alpr.display_alerts(active_alerts) # Display active alerts. + for plate in detected_plates_valid: + utils.play_sound("notification") + for alert in active_alerts: # Run once for each active alert. + if (config["realtime"]["push_notifications"]["enabled"] == True): # Check to see if the user has Gotify notifications enabled. + debug_message("Issuing alert push notification") + os.system("curl -X POST '" + config["realtime"]["push_notifications"]["server"] + "/message?token=" + config["realtime"]["push_notifications"]["token"] + "' -F 'title=Predator' -F 'message=A license plate in an alert database has been detected: " + detected_plate + "' > /dev/null 2>&1 &") # Send a push notification using Gotify. + + if (config["realtime"]["interface"]["display"]["shape_alerts"] == True): # Check to see if the user has enabled shape notifications. + utils.display_shape("triangle") # Display an ASCII triangle in the output. + + utils.play_sound("alert") # Play the alert sound, if configured to do so. + + + time.sleep(float(config["dashcam"]["alpr"]["interval"])) # Sleep (if configured to do so) before starting the next processing loop. diff --git a/docs/CONFIGURE.md b/docs/CONFIGURE.md index 9d29309..23ba7f7 100755 --- a/docs/CONFIGURE.md +++ b/docs/CONFIGURE.md @@ -322,7 +322,7 @@ This document describes the configuration values found `config.json`. - `"mps"` for meters-per-second - `"fps"` for feet-per-second - `"knot"` for knots - - `alpr` contains settings for controlling if and how Predator runs ALPR in the background while dashcam recording. + - `alpr` contains settings for controlling if and how Predator runs ALPR in the background while dashcam recording. Note that this feature will not run ALPR at the same performance as the dedicated real-time ALPR mode. - `enabled` is a boolean that determines if Predator will run ALPR in the background on dashcam threads. - When set to `false`, dashcam recording will continue as normal without ALPR. - When set to `true`, dashcam recording will run with ALPR analysis on a separate thread. diff --git a/main.py b/main.py index 8d949b4..7d465b2 100755 --- a/main.py +++ b/main.py @@ -43,15 +43,12 @@ process_gpx = utils.process_gpx # Load the GPX processing function from the utils script. save_to_file = utils.save_to_file # Load the file saving function from the utils script. add_to_file = utils.add_to_file # Load the file appending function from the utils script. -validate_plate = utils.validate_plate # Load the plate validation function from the utils script. display_shape = utils.display_shape # Load the shape displaying function from the utils script. countdown = utils.countdown # Load the timer countdown function from the utils script. get_gps_location = utils.get_gps_location # Load the function to get the current GPS location. convert_speed = utils.convert_speed # Load the function used to convert speeds from meters per second to other units. display_number = utils.display_number # Load the function used to display numbers as large ASCII font. closest_key = utils.closest_key # Load the function used to find the closest entry in a dictionary to a given number. -display_alerts = utils.display_alerts # Load the function used to display license plate alerts given the dictionary of alerts. -load_alert_database = utils.load_alert_database # Load the function used to load license plate alert databases. heartbeat = utils.heartbeat # Load the function to issue heartbeats to the interface directory. update_state = utils.update_state # Load the function to issue state updates to the interface directory. log_plates = utils.log_plates # Load the function to issue ALPR results to the interface directory. @@ -63,6 +60,8 @@ if (config["general"]["modes"]["enabled"]["realtime"] == True): import alpr + display_alerts = alpr.display_alerts # Load the function used to display license plate alerts given the dictionary of alerts. + load_alert_database = alpr.load_alert_database # Load the function used to load license plate alert databases. if (config["general"]["modes"]["enabled"]["dashcam"] == True): # Check to see if OpenCV is needed. import dashcam @@ -88,7 +87,6 @@ print("To reset so this message is displayed again, remove the `install.json` file inside the main install directory." + style.end) input(style.faint + "Press enter to continue..." + style.end) - print("") clear() print(style.bold + style.red + "Commercial Support" + style.end) print(style.bold + "V0LT offers the following commercial support services for Predator:" + style.end) @@ -103,21 +101,19 @@ print("To learn more, don't hesitate to get in contact: " + style.underline + "https://v0lttech.com/contact.php\n" + style.end) input(style.faint + "Press enter to continue..." + style.end) - print("") clear() print(style.bold + style.red + "Warranty" + style.end) print("While Predator is designed to be as reliable and consistent as possible, it comes with absolutely no warranty, it should not be used in a context where failure could cause harm to people or property.") print("For more information, see the `SECURITY.md` document.") input(style.faint + "Press enter to continue..." + style.end) - print("") clear() print(style.bold + style.red + "Privacy" + style.end) print("Predator does not share telemetry or usage data with V0LT, or any other entity. However, by default, Predator will attach a random identifier to requests made to remote license plate list sources (as configured under `general>alerts>databases`). This identifier allows administrators of servers hosting license plate lists to roughly count how many clients are using their lists. If you're concerned about the administrator of one of your remote license plate lists using this unique identifier to derive information about how often you use Predator (based on when you fetch their lists), you can disable this functionality using the `developer>identify_to_remote_sources` configuration value.") + print("Additionally, by default, Predator will fetch a hardcoded ignore list from the V0LT website. Once again, this functionality does not send any identifiable information or telemetry data. To disable this functionality, either enable the `developer>offline` configuration value to disable all network requests, or remove the hardcoded ignore list from `ignore.py`.") print("For more information, see the `docs/CONFIGURE.md` document.") input(style.faint + "Press enter to continue..." + style.end) - print("") clear() print(style.bold + style.red + "Funding" + style.end) print("Predator is completely free to use, and doesn't contain monetization like advertising or sponsorships. If you find the project to be useful, please consider supporting it financially.") @@ -936,7 +932,7 @@ for plate in alpr_frames[frame]: # Iterate through each plate detected per frame. for guess in alpr_frames[frame][plate]: # Iterate through each guess for each plate. if (alpr_frames[frame][plate][guess] >= float(config["general"]["alpr"]["validation"]["confidence"])): # Check to make sure this plate's confidence is higher than the minimum threshold set in the configuration. - if any(validate_plate(guess, format_template) for format_template in config["general"]["alpr"]["validation"]["license_plate_format"]) or "" in config["general"]["alpr"]["validation"]["license_plate_format"]: # Check to see if this plate passes validation. + if any(alpr.validate_plate(guess, format_template) for format_template in config["general"]["alpr"]["validation"]["license_plate_format"]) or "" in config["general"]["alpr"]["validation"]["license_plate_format"]: # Check to see if this plate passes validation. if (plate not in validated_alpr_frames[frame]): # Check to see if this plate hasn't been added to the validated information yet. validated_alpr_frames[frame][plate] = [] # Add the plate to the validated information as a blank placeholder list. validated_alpr_frames[frame][plate].append(guess) # Since this plate guess failed the validation test, delete it from the list of guesses. @@ -1387,7 +1383,7 @@ print (" Plate guesses:") for plate_guess in all_current_plate_guesses[individual_detected_plate]: # Iterate through each plate and grab the first plate that matches the plate formatting guidelines as the 'detected plate'. if (all_current_plate_guesses[individual_detected_plate][plate_guess] >= float(config["general"]["alpr"]["validation"]["confidence"])): # Check to make sure this plate's confidence is higher than the minimum threshold set in the configuration. - if any([validate_plate(plate_guess, format_template) for format_template in config["general"]["alpr"]["validation"]["license_plate_format"]]): # Check to see whether or not the plate passes the validation based on the format specified by the user. + if any([alpr.validate_plate(plate_guess, format_template) for format_template in config["general"]["alpr"]["validation"]["license_plate_format"]]): # Check to see whether or not the plate passes the validation based on the format specified by the user. detected_plate = plate_guess # Grab the validated plate as the 'detected plate'. successfully_found_plate = True # The plate was successfully validated, so indicate that a plate was successfully found this round. if (config["realtime"]["interface"]["display"]["show_validation"] == True): # Only print the validated plate if the configuration says to do so. @@ -1441,10 +1437,11 @@ if (config["realtime"]["interface"]["display"]["output_level"] >= 3): # Only display this status message if the output level indicates to do so. print("Displaying detected license plates...") + for plate in new_plates_detected: + play_sound("notification") if (config["realtime"]["interface"]["display"]["output_level"] >= 2): # Only display this status message if the output level indicates to do so. print("Plates detected: " + str(len(new_plates_detected))) # Display the number of license plates detected this round. for plate in new_plates_detected: - play_sound("notification") print(" Detected plate: " + plate) # Print the detected plate. @@ -1532,7 +1529,7 @@ plate_log[current_time]["plates"][plate]["alerts"] = list(dict.fromkeys(plate_log[current_time]["plates"][plate]["alerts"])) # De-duplicate the 'alerts' list for this plate. - save_to_file(plate_log_file_location, json.dumps(plate_log)) # Save the modified plate log to the disk as JSON data. + save_to_file(config["general"]["working_directory"] + "/" + config["realtime"]["saving"]["license_plates"]["file"], json.dumps(plate_log)) # Save the modified plate log to the disk as JSON data. diff --git a/utils.py b/utils.py index c1fd723..548c88a 100755 --- a/utils.py +++ b/utils.py @@ -479,23 +479,6 @@ def play_sound(sound_id): -# This function validates a license plate given a template. -def validate_plate(plate, template): - plate_valid = True # By default, the plate is valid, until we find a character that doesn't align. - - if (len(template) == len(plate)): # Make sure the template and plate are the same length. If so, continue with validation. Otherwise, automatically invalidate the plate, and skip the rest of the validation process. - for x in range(len(template)): - if (template[x].isalpha() == plate[x].isalpha() or template[x].isnumeric() == plate[x].isnumeric()): # If this character is alphabetical in both the template and plate, or if this character is numeric in both the template and plate, then this character is valid. - # This character is valid, so don't change anything. - pass - else: - # This character doesn't match between the template and plate, so mark the plate as invalid. - plate_valid = False - else: - plate_valid = False - - return plate_valid # Return the results of the plate validation - @@ -779,91 +762,6 @@ def closest_key(array, search_key): - -# This function is used to display a list of provided license plate alerts. -def display_alerts(active_alerts): - for alert in active_alerts: # Iterate through each active alert. - # Display an alert that is starkly different from the rest of the console output. - print(style.yellow + style.bold) - print("===================") - print("ALERT HIT - " + str(alert)) - if ("rule" in active_alerts[alert]): # Check to see if a rule exists for this alert plate. This should always be the case, but it's worth checking for sake of stability. - print("Rule: " + str(active_alerts[alert]["rule"])) # Display the rule that triggered this alert. - if ("name" in active_alerts[alert]): # Check to see if a name exists for this alert plate. - print("Name: " + str(active_alerts[alert]["name"])) # Display this alert plate's name. - if ("description" in active_alerts[alert]): # Check to see if a name exists for this alert plate. - print("Description: " + str(active_alerts[alert]["description"])) # Display this alert plate's description. - print("===================") - print(style.end + style.end) - - - - -# The following functions are responsible for loading alert database. -def load_alert_database_remote(source, cache_directory): - debug_message("Loading remote database source.") - if (config["realtime"]["saving"]["remote_alert_sources"] == True): - source_hash = hashlib.md5(source.encode()).hexdigest() - if (config["developer"]["offline"] == False): # Check to see if offline mode is disabled. - try: - raw_download_data = requests.get(source, timeout=6).text # Save the raw text data from the URL to a variable. - except: - raw_download_data = "{}" - display_message("The license plate alert database from " + source + " could not be loaded.", 2) - if (config["realtime"]["saving"]["remote_alert_sources"] == True): - if (os.path.exists(cache_directory + "/" + source_hash + ".json")): # Check to see if the cached file exists. - debug_message("Attempting to load locally cached data for this remote source.") - return load_alert_database_local(cache_directory + "/" + source_hash + ".json") # Load the locally cached file. - processed_download_data = str(raw_download_data) # Convert the downloaded data to a string. - try: - alert_database = json.loads(processed_download_data) # Load the alert database as JSON data. - if (config["realtime"]["saving"]["remote_alert_sources"] == True): - if (os.path.isdir(cache_directory) == False): - os.system("mkdir -p '" + str(cache_directory) + "'") - save_to_file(cache_directory + "/" + source_hash + ".json", json.dumps(alert_database)) - except: - alert_database = {} - display_message("The license plate alert database returned by the remote source " + source + " doesn't appear to be compatible JSON data. This source has not been loaded.", 2) - else: # Predator is in offline mode, but a remote alert database source was specified. - alert_database = {} # Set the alert database to an empty dictionary. - display_message("A remote alert database source " + source + " was specified, but Predator is in offline mode. This source has not been loaded.", 2) - - return alert_database - -def load_alert_database_local(source): - debug_message("Loading local database source.") - if (os.path.exists(source)): # Check to see if the database specified by the user actually exists. - f = open(source, "r") # Open the user-specified database file. - file_contents = f.read() # Read the file. - if (file_contents[0] == "{"): # Check to see if the first character in the file indicates that this alert database is a JSON database. - alert_database = json.loads(file_contents) # Load the alert database as JSON data. - else: - alert_database = {} - display_message("The alert database specified at " + source + " does appear to contain compatible JSON data. This source has not been loaded.", 3) - f.close() # Close the file. - else: # If the alert database specified by the user does not exist, alert the user of the error. - alert_database = {} - display_message("The alert database specified at " + source + " does not exist. This source has not been loaded.", 3) - - return alert_database - -def load_alert_database(sources, project_directory): # This function compiles the provided list of sources into a single complete alert dictionary. - cache_directory = project_directory + "/" + config["realtime"]["saving"]["remote_alert_sources"]["directory"] - debug_message("Loading license plate alert list") - complete_alert_database = {} # Set the complete alert database to a placeholder dictionary. - for source in sources: # Iterate through each source in the list of sources. - if (validators.url(source)): # Check to see if the user supplied a URL as their alert database. - alert_database = load_alert_database_remote(source, cache_directory) - else: # The input the user supplied doesn't appear to be a URL, so assume it is a file. - alert_database = load_alert_database_local(project_directory + "/" + source) - - for rule in alert_database: # Iterate over each rule in this database. - complete_alert_database[rule] = alert_database[rule] # Add this rule to the complete alert database. - - return complete_alert_database - - - def sizeof_fmt(num, suffix='B'): for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']: if abs(num) < 1024.0: