diff --git a/examples/wifi/expanded/requests_wifi_api_openskynetwork_private_area.py b/examples/wifi/expanded/requests_wifi_api_openskynetwork_private_area.py index 8cd08a2..1652a73 100644 --- a/examples/wifi/expanded/requests_wifi_api_openskynetwork_private_area.py +++ b/examples/wifi/expanded/requests_wifi_api_openskynetwork_private_area.py @@ -1,155 +1,164 @@ -# SPDX-FileCopyrightText: 2023 DJDevon3 +# SPDX-FileCopyrightText: 2024 DJDevon3 # SPDX-License-Identifier: MIT -# Coded for Circuit Python 8.1 -# DJDevon3 ESP32-S3 OpenSkyNetwork_Private_Area_API_Example +# Coded for Circuit Python 8.2.x +"""OpenSky-Network.org Private Area API Example""" -import json +import binascii import os -import ssl import time -import circuitpython_base64 as base64 -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests # OpenSky-Network.org Website Login required for this API +# Increased call limit vs Public. # REST API: https://openskynetwork.github.io/opensky-api/rest.html - # Retrieves all traffic within a geographic area (Orlando example) -latmin = "27.22" # east bounding box -latmax = "28.8" # west bounding box -lonmin = "-81.46" # north bounding box -lonmax = "-80.40" # south bounding box +LATMIN = "27.22" # east bounding box +LATMAX = "28.8" # west bounding box +LONMIN = "-81.46" # north bounding box +LONMAX = "-80.40" # south bounding box -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) +# Get WiFi details, ensure these are setup in settings.toml +ssid = os.getenv("CIRCUITPY_WIFI_SSID") +password = os.getenv("CIRCUITPY_WIFI_PASSWORD") +osnusername = os.getenv("OSN_USERNAME") # Website Credentials +osnpassword = os.getenv("OSN_PASSWORD") # Website Credentials -# Time between API refreshes +# API Polling Rate # 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour # OpenSky-Networks IP bans for too many requests, check rate limit. # https://openskynetwork.github.io/opensky-api/rest.html#limitations -sleep_time = 1800 +SLEEP_TIME = 1800 -# Get WiFi details, ensure these are setup in settings.toml -ssid = os.getenv("CIRCUITPY_WIFI_SSID") -password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -# No token required, only website login -osnu = os.getenv("OSN_Username") -osnp = os.getenv("OSN_Password") - -osn_cred = str(osnu) + ":" + str(osnp) -bytes_to_encode = b" " + str(osn_cred) + " " -base64_string = base64.encodebytes(bytes_to_encode) -base64cred = repr(base64_string)[2:-1] - -Debug_Auth = False # STREAMER WARNING this will show your credentials! -if Debug_Auth: - osn_cred = str(osnu) + ":" + str(osnp) - bytes_to_encode = b" " + str(osn_cred) + " " - print(repr(bytes_to_encode)) - base64_string = base64.encodebytes(bytes_to_encode) - print(repr(base64_string)[2:-1]) - base64cred = repr(base64_string)[2:-1] - print("Decoded Bytes:", str(base64cred)) - -# OSN requires your username:password to be base64 encoded -# so technically it's not transmitted in the clear but w/e -osn_header = {"Authorization": "Basic " + str(base64cred)} - -# Example request of all traffic over Florida, geographic areas cost less per call. +# Set debug to True for full JSON response. +# WARNING: makes credentials visible. based on how many flights +# in your area, full response could crash microcontroller +DEBUG = False + +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) + +# -- Base64 Conversion -- +OSN_CREDENTIALS = str(osnusername) + ":" + str(osnpassword) +# base64 encode and strip appended \n from bytearray +OSN_CREDENTIALS_B = binascii.b2a_base64(OSN_CREDENTIALS.encode()).strip() +BASE64_STRING = OSN_CREDENTIALS_B.decode() # bytearray + +if DEBUG: + print("Base64 ByteArray: ", BASE64_STRING) + +# Area requires OpenSky-Network.org username:password to be base64 encoded +OSN_HEADER = {"Authorization": "Basic " + BASE64_STRING} + +# Example request of all traffic over Florida. +# Geographic areas calls cost less against the limit. # https://opensky-network.org/api/states/all?lamin=25.21&lomin=-84.36&lamax=30.0&lomax=-78.40 OPENSKY_SOURCE = ( "https://opensky-network.org/api/states/all?" + "lamin=" - + latmin + + LATMIN + "&lomin=" - + lonmin + + LONMIN + "&lamax=" - + latmax + + LATMAX + "&lomax=" - + lonmax + + LONMAX ) -# Converts seconds to human readable minutes/hours/days -def time_calc(input_time): # input_time in seconds +def time_calc(input_time): + """Converts seconds to minutes/hours/days""" if input_time < 60: - sleep_int = input_time - time_output = f"{sleep_int:.0f} seconds" - elif 60 <= input_time < 3600: - sleep_int = input_time / 60 - time_output = f"{sleep_int:.0f} minutes" - elif 3600 <= input_time < 86400: - sleep_int = input_time / 60 / 60 - time_output = f"{sleep_int:.1f} hours" - else: - sleep_int = input_time / 60 / 60 / 24 - time_output = f"{sleep_int:.1f} days" - return time_output + return f"{input_time:.0f} seconds" + if input_time < 3600: + return f"{input_time / 60:.0f} minutes" + if input_time < 86400: + return f"{input_time / 60 / 60:.0f} hours" + return f"{input_time / 60 / 60 / 24:.1f} days" def _format_datetime(datetime): - return "{:02}/{:02}/{} {:02}:{:02}:{:02}".format( - datetime.tm_mon, - datetime.tm_mday, - datetime.tm_year, - datetime.tm_hour, - datetime.tm_min, - datetime.tm_sec, + """F-String formatted struct time conversion""" + return ( + f"{datetime.tm_mon:02}/" + + f"{datetime.tm_mday:02}/" + + f"{datetime.tm_year:02} " + + f"{datetime.tm_hour:02}:" + + f"{datetime.tm_min:02}:" + + f"{datetime.tm_sec:02}" ) -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -request = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.ipv4_address: +while True: + # Connect to Wi-Fi + print("\nConnecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") + try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) -print("Connected!\n") + print(" | Attempting to GET OpenSky-Network Area Flights JSON!") + try: + opensky_response = requests.get(url=OPENSKY_SOURCE, headers=OSN_HEADER) + opensky_json = opensky_response.json() + except ConnectionError as e: + print("Connection Error:", e) + print("Retrying in 10 seconds") -while True: - # STREAMER WARNING this will show your credentials! - debug_request = False # Set True to see full request - if debug_request: - print("Full API HEADER: ", str(osn_header)) - print("Full API GET URL: ", OPENSKY_SOURCE) - print("===============================") + print(" | ✅ OpenSky-Network JSON!") - print("\nAttempting to GET OpenSky-Network Data!") - opensky_response = request.get(url=OPENSKY_SOURCE, headers=osn_header).json() + if DEBUG: + print("Full API GET URL: ", OPENSKY_SOURCE) + print(opensky_json) - # Print Full JSON to Serial (doesn't show credentials) - debug_response = False # Set True to see full response - if debug_response: - dump_object = json.dumps(opensky_response) - print("JSON Dump: ", dump_object) + # ERROR MESSAGE RESPONSES + if "timestamp" in opensky_json: + osn_timestamp = opensky_json["timestamp"] + print(f"❌ Timestamp: {osn_timestamp}") - # Key:Value Serial Debug (doesn't show credentials) - osn_debug_keys = True # Set True to print Serial data - if osn_debug_keys: - try: - osn_flight = opensky_response["time"] - print("Current Unix Time: ", osn_flight) + if "message" in opensky_json: + osn_message = opensky_json["message"] + print(f"❌ Message: {osn_message}") + + if "error" in opensky_json: + osn_error = opensky_json["error"] + print(f"❌ Error: {osn_error}") + + if "path" in opensky_json: + osn_path = opensky_json["path"] + print(f"❌ Path: {osn_path}") - current_struct_time = time.localtime(osn_flight) - current_date = "{}".format(_format_datetime(current_struct_time)) - print(f"Unix to Readable Time: {current_date}") + if "status" in opensky_json: + osn_status = opensky_json["status"] + print(f"❌ Status: {osn_status}") - # Current flight data for single callsign (right now) - osn_all_flights = opensky_response["states"] + # Current flight data for single callsign (right now) + osn_all_flights = opensky_json["states"] + + if osn_all_flights is not None: + if DEBUG: + print(f" | | Area Flights Full Response: {osn_all_flights}") + + osn_time = opensky_json["time"] + # print(f" | | Last Contact Unix Time: {osn_time}") + osn_struct_time = time.localtime(osn_time) + osn_readable_time = f"{_format_datetime(osn_struct_time)}" + print(f" | | Timestamp: {osn_readable_time}") if osn_all_flights is not None: # print("Flight Data: ", osn_all_flights) for flights in osn_all_flights: - osn_t = f"Trans:{flights[0]} " - osn_c = f"Sign:{flights[1]} " + osn_t = f" | | Trans:{flights[0]} " + osn_c = f"Sign:{flights[1]}" osn_o = f"Origin:{flights[2]} " osn_tm = f"Time:{flights[3]} " osn_l = f"Last:{flights[4]} " @@ -171,16 +180,20 @@ def _format_datetime(datetime): string2 = f"{osn_la}{osn_ba}{osn_g}{osn_v}{osn_h}{osn_vr}" string3 = f"{osn_s}{osn_ga}{osn_sq}{osn_pr}{osn_ps}{osn_ca}" print(f"{string1}{string2}{string3}") - else: - print("Flight has no active data or you're polling too fast.") - - print("\nFinished!") - print("Board Uptime: ", time_calc(time.monotonic())) - print("Next Update: ", time_calc(sleep_time)) - time.sleep(sleep_time) - print("===============================") - - except (ConnectionError, ValueError, NameError) as e: - print("OSN Connection Error:", e) - print("Next Retry: ", time_calc(sleep_time)) - time.sleep(sleep_time) + + else: + print(" | | ❌ Area has no active data or you're polling too fast.") + + opensky_response.close() + print("✂️ Disconnected from OpenSky-Network API") + + print("\nFinished!") + print(f"Board Uptime: {time_calc(time.monotonic())}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") + print("===============================") + + except (ValueError, RuntimeError) as e: + print(f"Failed to get data, retrying\n {e}") + time.sleep(60) + break + time.sleep(SLEEP_TIME)