diff --git a/examples/wifi/expanded/requests_wifi_api_openskynetwork_private.py b/examples/wifi/expanded/requests_wifi_api_openskynetwork_private.py index 486a4de..4a30b57 100644 --- a/examples/wifi/expanded/requests_wifi_api_openskynetwork_private.py +++ b/examples/wifi/expanded/requests_wifi_api_openskynetwork_private.py @@ -1,15 +1,13 @@ -# SPDX-FileCopyrightText: 2023 DJDevon3 +# SPDX-FileCopyrightText: 2024 DJDevon3 # SPDX-License-Identifier: MIT -# Coded for Circuit Python 8.1 -# DJDevon3 ESP32-S3 OpenSkyNetwork_Private_API_Example +# Coded for Circuit Python 8.2.x +"""OpenSky-Network.org Single Flight Private API Example""" -import json +import binascii import os -import ssl import time -import circuitpython_base64 as base64 -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests @@ -19,135 +17,177 @@ # All active flights JSON: https://opensky-network.org/api/states/all # PICK ONE! :) # JSON order: transponder, callsign, country # ACTIVE transpondes only, for multiple "c822af&icao24=cb3993&icao24=c63923" -transponder = "7c6b2d" +TRANSPONDER = "4b1812" -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) +# Get WiFi details, ensure these are setup in settings.toml +ssid = os.getenv("CIRCUITPY_WIFI_SSID") +password = os.getenv("CIRCUITPY_WIFI_PASSWORD") +osnusername = os.getenv("OSN_USERNAME") # Website Credentials +osnpassword = os.getenv("OSN_PASSWORD") # Website Credentials -# Time between API refreshes +# API Polling Rate # 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour # OpenSky-Networks IP bans for too many requests, check rate limit. # https://openskynetwork.github.io/opensky-api/rest.html#limitations -sleep_time = 1800 +SLEEP_TIME = 1800 -# Get WiFi details, ensure these are setup in settings.toml -ssid = os.getenv("CIRCUITPY_WIFI_SSID") -password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -osnu = os.getenv("OSN_Username") -osnp = os.getenv("OSN_Password") - -osn_cred = str(osnu) + ":" + str(osnp) -bytes_to_encode = b" " + str(osn_cred) + " " -base64_string = base64.encodebytes(bytes_to_encode) -base64cred = repr(base64_string)[2:-1] - -Debug_Auth = False # STREAMER WARNING this will show your credentials! -if Debug_Auth: - osn_cred = str(osnu) + ":" + str(osnp) - bytes_to_encode = b" " + str(osn_cred) + " " - print(repr(bytes_to_encode)) - base64_string = base64.encodebytes(bytes_to_encode) - print(repr(base64_string)[2:-1]) - base64cred = repr(base64_string)[2:-1] - print("Decoded Bytes:", str(base64cred)) +# Set debug to True for full JSON response. +# WARNING: makes credentials visible +DEBUG = False + +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) + +# -- Base64 Conversion -- +OSN_CREDENTIALS = str(osnusername) + ":" + str(osnpassword) +# base64 encode and strip appended \n from bytearray +OSN_CREDENTIALS_B = binascii.b2a_base64(OSN_CREDENTIALS.encode()).strip() +BASE64_STRING = OSN_CREDENTIALS_B.decode() # bytearray + + +if DEBUG: + print("Base64 ByteArray: ", BASE64_STRING) # Requests URL - icao24 is their endpoint required for a transponder # example https://opensky-network.org/api/states/all?icao24=a808c5 -# OSN private requires your username:password to be base64 encoded -osn_header = {"Authorization": "Basic " + str(base64cred)} -OPENSKY_SOURCE = "https://opensky-network.org/api/states/all?" + "icao24=" + transponder +# OSN private: requires your website username:password to be base64 encoded +OPENSKY_HEADER = {"Authorization": "Basic " + BASE64_STRING} +OPENSKY_SOURCE = "https://opensky-network.org/api/states/all?" + "icao24=" + TRANSPONDER -# Converts seconds to human readable minutes/hours/days -def time_calc(input_time): # input_time in seconds +def time_calc(input_time): + """Converts seconds to minutes/hours/days""" if input_time < 60: - sleep_int = input_time - time_output = f"{sleep_int:.0f} seconds" - elif 60 <= input_time < 3600: - sleep_int = input_time / 60 - time_output = f"{sleep_int:.0f} minutes" - elif 3600 <= input_time < 86400: - sleep_int = input_time / 60 / 60 - time_output = f"{sleep_int:.1f} hours" - else: - sleep_int = input_time / 60 / 60 / 24 - time_output = f"{sleep_int:.1f} days" - return time_output + return f"{input_time:.0f} seconds" + if input_time < 3600: + return f"{input_time / 60:.0f} minutes" + if input_time < 86400: + return f"{input_time / 60 / 60:.0f} hours" + return f"{input_time / 60 / 60 / 24:.1f} days" def _format_datetime(datetime): - return "{:02}/{:02}/{} {:02}:{:02}:{:02}".format( - datetime.tm_mon, - datetime.tm_mday, - datetime.tm_year, - datetime.tm_hour, - datetime.tm_min, - datetime.tm_sec, + return ( + f"{datetime.tm_mon:02}/" + + f"{datetime.tm_mday:02}/" + + f"{datetime.tm_year:02} " + + f"{datetime.tm_hour:02}:" + + f"{datetime.tm_min:02}:" + + f"{datetime.tm_sec:02}" ) -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -request = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.ipv4_address: +while True: + # Connect to Wi-Fi + print("\nConnecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") + try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) -print("Connected!\n") + print(" | Attempting to GET OpenSky-Network Single Private Flight JSON!") + print(" | Website Credentials Required! Allows more daily calls than Public.") + try: + opensky_response = requests.get(url=OPENSKY_SOURCE, headers=OPENSKY_HEADER) + opensky_json = opensky_response.json() + except ConnectionError as e: + print("Connection Error:", e) + print("Retrying in 10 seconds") -while True: - # STREAMER WARNING this will show your credentials! - debug_request = False # Set True to see full request - if debug_request: - print("Full API HEADER: ", str(osn_header)) - print("Full API GET URL: ", OPENSKY_SOURCE) - print("===============================") + print(" | ✅ OpenSky-Network JSON!") - print("\nAttempting to GET OpenSky-Network Data!") - opensky_response = request.get(url=OPENSKY_SOURCE, headers=osn_header).json() + if DEBUG: + print("Full API GET URL: ", OPENSKY_SOURCE) + print("Full API GET Header: ", OPENSKY_HEADER) + print(opensky_json) - # Print Full JSON to Serial (doesn't show credentials) - debug_response = False # Set True to see full response - if debug_response: - dump_object = json.dumps(opensky_response) - print("JSON Dump: ", dump_object) + # ERROR MESSAGE RESPONSES + if "timestamp" in opensky_json: + osn_timestamp = opensky_json["timestamp"] + print(f"❌ Timestamp: {osn_timestamp}") - # Key:Value Serial Debug (doesn't show credentials) - osn_debug_keys = True # Set True to print Serial data - if osn_debug_keys: - try: - osn_flight = opensky_response["time"] - print("Current Unix Time: ", osn_flight) - - current_struct_time = time.localtime(osn_flight) - current_date = "{}".format(_format_datetime(current_struct_time)) - print(f"Unix to Readable Time: {current_date}") - - # Current flight data for single callsign (right now) - osn_single_flight_data = opensky_response["states"] - - if osn_single_flight_data is not None: - print("Flight Data: ", osn_single_flight_data) - transponder = opensky_response["states"][0][0] - print("Transponder: ", transponder) - callsign = opensky_response["states"][0][1] - print("Callsign: ", callsign) - country = opensky_response["states"][0][2] - print("Flight Country: ", country) + if "message" in opensky_json: + osn_message = opensky_json["message"] + print(f"❌ Message: {osn_message}") + + if "error" in opensky_json: + osn_error = opensky_json["error"] + print(f"❌ Error: {osn_error}") + + if "path" in opensky_json: + osn_path = opensky_json["path"] + print(f"❌ Path: {osn_path}") + + if "status" in opensky_json: + osn_status = opensky_json["status"] + print(f"❌ Status: {osn_status}") + + # Current flight data for single callsign (right now) + osn_single_flight_data = opensky_json["states"] + + if osn_single_flight_data is not None: + if DEBUG: + print(f" | | Single Private Flight Data: {osn_single_flight_data}") + + last_contact = opensky_json["states"][0][4] + # print(f" | | Last Contact Unix Time: {last_contact}") + lc_struct_time = time.localtime(last_contact) + lc_readable_time = f"{_format_datetime(lc_struct_time)}" + print(f" | | Last Contact: {lc_readable_time}") + + flight_transponder = opensky_json["states"][0][0] + print(f" | | Transponder: {flight_transponder}") + + callsign = opensky_json["states"][0][1] + print(f" | | Callsign: {callsign}") + + squawk = opensky_json["states"][0][14] + print(f" | | Squawk: {squawk}") + + country = opensky_json["states"][0][2] + print(f" | | Origin: {country}") + + longitude = opensky_json["states"][0][5] + print(f" | | Longitude: {longitude}") + + latitude = opensky_json["states"][0][6] + print(f" | | Latitude: {latitude}") + + # Return Air Flight data if not on ground + on_ground = opensky_json["states"][0][8] + if on_ground is True: + print(f" | | On Ground: {on_ground}") else: - print("Flight has no active data or you're polling too fast.") - - print("\nFinished!") - print("Board Uptime: ", time_calc(time.monotonic())) - print("Next Update: ", time_calc(sleep_time)) - time.sleep(sleep_time) - print("===============================") - - except (ConnectionError, ValueError, NameError) as e: - print("OSN Connection Error:", e) - print("Next Retry: ", time_calc(sleep_time)) - time.sleep(sleep_time) + altitude = opensky_json["states"][0][7] + print(f" | | Barometric Altitude: {altitude}") + + velocity = opensky_json["states"][0][9] + if velocity != "null": + print(f" | | Velocity: {velocity}") + + vertical_rate = opensky_json["states"][0][11] + if vertical_rate != "null": + print(f" | | Vertical Rate: {vertical_rate}") + + else: + print(" | | ❌ Flight has no active data or you're polling too fast.") + + opensky_response.close() + print("✂️ Disconnected from OpenSky-Network API") + + print("\nFinished!") + print(f"Board Uptime: {time_calc(time.monotonic())}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") + print("===============================") + + except (ValueError, RuntimeError) as e: + print(f"Failed to get data, retrying\n {e}") + time.sleep(60) + break + time.sleep(SLEEP_TIME) diff --git a/examples/wifi/expanded/requests_wifi_api_openskynetwork_public.py b/examples/wifi/expanded/requests_wifi_api_openskynetwork_public.py index 5bcd69b..19198e1 100644 --- a/examples/wifi/expanded/requests_wifi_api_openskynetwork_public.py +++ b/examples/wifi/expanded/requests_wifi_api_openskynetwork_public.py @@ -1,13 +1,13 @@ -# SPDX-FileCopyrightText: 2023 DJDevon3 +# SPDX-FileCopyrightText: 2024 DJDevon3 # SPDX-License-Identifier: MIT -# Coded for Circuit Python 8.1 -# Adafruit Feather ESP32-S3 OpenSkyNetwork_Public_API_Example -import json +# Coded for Circuit Python 8.2.x +"""OpenSky-Network.org Public API Example""" +# pylint: disable=import-error + import os -import ssl import time -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests @@ -17,121 +17,164 @@ # All active flights JSON: https://opensky-network.org/api/states/all PICK ONE! # JSON order: transponder, callsign, country # ACTIVE transpondes only, for multiple "c822af&icao24=cb3993&icao24=c63923" -transponder = "ab1644" +TRANSPONDER = "3c5ef8" -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) +# Get WiFi details, ensure these are setup in settings.toml +ssid = os.getenv("CIRCUITPY_WIFI_SSID") +password = os.getenv("CIRCUITPY_WIFI_PASSWORD") +osnusername = os.getenv("OSN_USERNAME") # Website Credentials +osnpassword = os.getenv("OSN_PASSWORD") # Website Credentials -# Time between API refreshes +# API Polling Rate # 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour -# OpenSky-Networks will temp ban your IP for too many daily requests. +# OpenSky-Networks IP bans for too many requests, check rate limit. # https://openskynetwork.github.io/opensky-api/rest.html#limitations -sleep_time = 1800 +SLEEP_TIME = 1800 -# Get WiFi details, ensure these are setup in settings.toml -ssid = os.getenv("CIRCUITPY_WIFI_SSID") -password = os.getenv("CIRCUITPY_WIFI_PASSWORD") +# Set debug to True for full JSON response. +# WARNING: makes credentials visible +DEBUG = False + +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) # Requests URL - icao24 is their endpoint required for a transponder # example https://opensky-network.org/api/states/all?icao24=a808c5 -OPENSKY_SOURCE = "https://opensky-network.org/api/states/all?" + "icao24=" + transponder +OPENSKY_SOURCE = "https://opensky-network.org/api/states/all?" + "icao24=" + TRANSPONDER -# Converts seconds to human readable minutes/hours/days -def time_calc(input_time): # input_time in seconds +def time_calc(input_time): + """Converts seconds to minutes/hours/days""" if input_time < 60: - sleep_int = input_time - time_output = f"{sleep_int:.0f} seconds" - elif 60 <= input_time < 3600: - sleep_int = input_time / 60 - time_output = f"{sleep_int:.0f} minutes" - elif 3600 <= input_time < 86400: - sleep_int = input_time / 60 / 60 - time_output = f"{sleep_int:.1f} hours" - else: - sleep_int = input_time / 60 / 60 / 24 - time_output = f"{sleep_int:.1f} days" - return time_output + return f"{input_time:.0f} seconds" + if input_time < 3600: + return f"{input_time / 60:.0f} minutes" + if input_time < 86400: + return f"{input_time / 60 / 60:.0f} hours" + return f"{input_time / 60 / 60 / 24:.1f} days" def _format_datetime(datetime): - return "{:02}/{:02}/{} {:02}:{:02}:{:02}".format( - datetime.tm_mon, - datetime.tm_mday, - datetime.tm_year, - datetime.tm_hour, - datetime.tm_min, - datetime.tm_sec, + """F-String formatted struct time conversion""" + return ( + f"{datetime.tm_mon:02}/" + + f"{datetime.tm_mday:02}/" + + f"{datetime.tm_year:02} " + + f"{datetime.tm_hour:02}:" + + f"{datetime.tm_min:02}:" + + f"{datetime.tm_sec:02}" ) -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -requests = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.ipv4_address: - try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) -print("Connected!\n") - while True: - debug_request = True # Set true to see full request - if debug_request: - print("Full API GET URL: ", OPENSKY_SOURCE) - print("===============================") + # Connect to Wi-Fi + print("\nConnecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") + try: - print("\nAttempting to GET OpenSky-Network Stats!") - opensky_response = requests.get(url=OPENSKY_SOURCE) - osn_json = opensky_response.json() - except (ConnectionError, ValueError, NameError) as e: - print("Host No Response Error:", e) - - # Print Full JSON to Serial - debug_response = False # Set true to see full response - if debug_response: - dump_object = json.dumps(osn_json) - print("JSON Dump: ", dump_object) - - # Print to Serial - osn_debug_keys = True # Set true to print Serial data - if osn_debug_keys: + print(" | Attempting to GET OpenSky-Network Single Flight JSON!") try: - osn_flight = osn_json["time"] - print("Current Unix Time: ", osn_flight) - - current_struct_time = time.localtime(osn_flight) - current_date = "{}".format(_format_datetime(current_struct_time)) - print(f"Unix to Readable Time: {current_date}") - - osn_single_flight_data = osn_json["states"] - if osn_single_flight_data is not None: - print("Flight Data: ", osn_single_flight_data) - transponder = osn_json["states"][0][0] - print("Transponder: ", transponder) - callsign = osn_json["states"][0][1] - print("Callsign: ", callsign) - country = osn_json["states"][0][2] - print("Flight Country: ", country) + opensky_response = requests.get(url=OPENSKY_SOURCE) + opensky_json = opensky_response.json() + except ConnectionError as e: + print("Connection Error:", e) + print("Retrying in 10 seconds") + + print(" | ✅ OpenSky-Network JSON!") + + if DEBUG: + print("Full API GET URL: ", OPENSKY_SOURCE) + print(opensky_json) + + # ERROR MESSAGE RESPONSES + if "timestamp" in opensky_json: + osn_timestamp = opensky_json["timestamp"] + print(f"❌ Timestamp: {osn_timestamp}") + + if "message" in opensky_json: + osn_message = opensky_json["message"] + print(f"❌ Message: {osn_message}") + + if "error" in opensky_json: + osn_error = opensky_json["error"] + print(f"❌ Error: {osn_error}") + + if "path" in opensky_json: + osn_path = opensky_json["path"] + print(f"❌ Path: {osn_path}") + + if "status" in opensky_json: + osn_status = opensky_json["status"] + print(f"❌ Status: {osn_status}") + + # Current flight data for single callsign (right now) + osn_single_flight_data = opensky_json["states"] + + if osn_single_flight_data is not None: + if DEBUG: + print(f" | | Single Flight Public Data: {osn_single_flight_data}") + + last_contact = opensky_json["states"][0][4] + # print(f" | | Last Contact Unix Time: {last_contact}") + lc_struct_time = time.localtime(last_contact) + lc_readable_time = f"{_format_datetime(lc_struct_time)}" + print(f" | | Last Contact: {lc_readable_time}") + + flight_transponder = opensky_json["states"][0][0] + print(f" | | Transponder: {flight_transponder}") + + callsign = opensky_json["states"][0][1] + print(f" | | Callsign: {callsign}") + + squawk = opensky_json["states"][0][14] + print(f" | | Squawk: {squawk}") + + country = opensky_json["states"][0][2] + print(f" | | Origin: {country}") + + longitude = opensky_json["states"][0][5] + print(f" | | Longitude: {longitude}") + + latitude = opensky_json["states"][0][6] + print(f" | | Latitude: {latitude}") + + # Return Air Flight data if not on ground + on_ground = opensky_json["states"][0][8] + if on_ground is True: + print(f" | | On Ground: {on_ground}") else: - print("This flight has no active data or you're polling too fast.") - print( - "Read: https://openskynetwork.github.io/opensky-api/rest.html#limitations" - ) - print( - "Public Limits: 10 second max poll rate & 400 weighted calls daily" - ) - - print("\nFinished!") - print("Board Uptime: ", time_calc(time.monotonic())) - print("Next Update: ", time_calc(sleep_time)) - time.sleep(sleep_time) - print("===============================") - - except (ConnectionError, ValueError, NameError) as e: - print("OSN Connection Error:", e) - print("Next Retry: ", time_calc(sleep_time)) - time.sleep(sleep_time) + altitude = opensky_json["states"][0][7] + print(f" | | Barometric Altitude: {altitude}") + + velocity = opensky_json["states"][0][9] + if velocity != "null": + print(f" | | Velocity: {velocity}") + + vertical_rate = opensky_json["states"][0][11] + if vertical_rate != "null": + print(f" | | Vertical Rate: {vertical_rate}") + else: + print("This flight has no active data or you're polling too fast.") + print("Public Limits: 10 second max poll & 400 weighted calls daily") + + opensky_response.close() + print("✂️ Disconnected from OpenSky-Network API") + + print("\nFinished!") + print(f"Board Uptime: {time_calc(time.monotonic())}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") + print("===============================") + + except (ValueError, RuntimeError) as e: + print(f"Failed to get data, retrying\n {e}") + time.sleep(60) + break + time.sleep(SLEEP_TIME) diff --git a/examples/wifi/expanded/requests_wifi_api_twitter.py b/examples/wifi/expanded/requests_wifi_api_twitter.py deleted file mode 100644 index 4dcdfa6..0000000 --- a/examples/wifi/expanded/requests_wifi_api_twitter.py +++ /dev/null @@ -1,120 +0,0 @@ -# SPDX-FileCopyrightText: 2022 DJDevon3 for Adafruit Industries -# SPDX-License-Identifier: MIT -# Coded for Circuit Python 8.0 -"""DJDevon3 Adafruit Feather ESP32-S2 Twitter_API_Example""" -import gc -import json -import os -import ssl -import time - -import socketpool -import wifi - -import adafruit_requests - -# Twitter developer account bearer token required. -# Ensure these are uncommented and in secrets.py or .env -# "TW_userid": "Your Twitter user id", # numerical id not username -# "TW_bearer_token": "Your long API Bearer token", - -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) - -# Time between API refreshes -# 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour -sleep_time = 900 - -# Get WiFi details, ensure these are setup in settings.toml -ssid = os.getenv("CIRCUITPY_WIFI_SSID") -password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -tw_userid = os.getenv("TW_userid") -tw_bearer_token = os.getenv("TW_bearer_token") - -if sleep_time < 60: - sleep_time_conversion = "seconds" - sleep_int = sleep_time -elif 60 <= sleep_time < 3600: - sleep_int = sleep_time / 60 - sleep_time_conversion = "minutes" -elif 3600 <= sleep_time < 86400: - sleep_int = sleep_time / 60 / 60 - sleep_time_conversion = "hours" -else: - sleep_int = sleep_time / 60 / 60 / 24 - sleep_time_conversion = "days" - -# Used with any Twitter 0auth request. -twitter_header = {"Authorization": "Bearer " + tw_bearer_token} -TW_SOURCE = ( - "https://api.twitter.com/2/users/" - + tw_userid - + "?user.fields=public_metrics,created_at,pinned_tweet_id" - + "&expansions=pinned_tweet_id" - + "&tweet.fields=created_at,public_metrics,source,context_annotations,entities" -) - -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -requests = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.ipv4_address: - try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) - gc.collect() -print("Connected!\n") - -while True: - try: - print("\nAttempting to GET Twitter Stats!") # -------------------------------- - debug_request = False # Set true to see full request - if debug_request: - print("Full API GET URL: ", TW_SOURCE) - print("===============================") - try: - twitter_response = requests.get(url=TW_SOURCE, headers=twitter_header) - tw_json = twitter_response.json() - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - - # Print Full JSON to Serial - debug_response = False # Set true to see full response - if debug_response: - dump_object = json.dumps(tw_json) - print("JSON Dump: ", dump_object) - - # Print to Serial - tw_debug_keys = True # Set true to print Serial data - if tw_debug_keys: - tw_userid = tw_json["data"]["id"] - print("User ID: ", tw_userid) - - tw_username = tw_json["data"]["name"] - print("Name: ", tw_username) - - tw_join_date = tw_json["data"]["created_at"] - print("Member Since: ", tw_join_date) - - tw_tweets = tw_json["data"]["public_metrics"]["tweet_count"] - print("Tweets: ", tw_tweets) - - tw_followers = tw_json["data"]["public_metrics"]["followers_count"] - print("Followers: ", tw_followers) - - print("Monotonic: ", time.monotonic()) - - print("\nFinished!") - print("Next Update in %s %s" % (int(sleep_int), sleep_time_conversion)) - print("===============================") - gc.collect() - - except (ValueError, RuntimeError) as e: - print("Failed to get data, retrying\n", e) - time.sleep(60) - continue - time.sleep(sleep_time)