diff --git a/examples/wifi/expanded/requests_wifi_adafruit_discord_active_online.py b/examples/wifi/expanded/requests_wifi_adafruit_discord_active_online.py index dec0e6e..3ebcaf4 100644 --- a/examples/wifi/expanded/requests_wifi_adafruit_discord_active_online.py +++ b/examples/wifi/expanded/requests_wifi_adafruit_discord_active_online.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: MIT # Coded for Circuit Python 8.2.x """Discord Active Online Shields.IO Example""" -# pylint: disable=import-error import os import time diff --git a/examples/wifi/expanded/requests_wifi_api_discord.py b/examples/wifi/expanded/requests_wifi_api_discord.py index 83b8f8f..08ad16c 100644 --- a/examples/wifi/expanded/requests_wifi_api_discord.py +++ b/examples/wifi/expanded/requests_wifi_api_discord.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: MIT # Coded for Circuit Python 8.2.x """Discord Web Scrape Example""" -# pylint: disable=import-error import os import time diff --git a/examples/wifi/expanded/requests_wifi_api_fitbit.py b/examples/wifi/expanded/requests_wifi_api_fitbit.py index 802995c..206a56d 100644 --- a/examples/wifi/expanded/requests_wifi_api_fitbit.py +++ b/examples/wifi/expanded/requests_wifi_api_fitbit.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: MIT # Coded for Circuit Python 8.2.x """Fitbit API Example""" -# pylint: disable=import-error, disable=no-member +# pylint: disable=no-member import os import time diff --git a/examples/wifi/expanded/requests_wifi_api_github.py b/examples/wifi/expanded/requests_wifi_api_github.py index f1f86e3..ed8aaae 100644 --- a/examples/wifi/expanded/requests_wifi_api_github.py +++ b/examples/wifi/expanded/requests_wifi_api_github.py @@ -1,103 +1,107 @@ -# SPDX-FileCopyrightText: 2022 DJDevon3 for Adafruit Industries +# SPDX-FileCopyrightText: 2024 DJDevon3 # SPDX-License-Identifier: MIT -# Coded for Circuit Python 8.0 -"""DJDevon3 Adafruit Feather ESP32-S2 Github_API_Example""" -import gc -import json +# Coded for Circuit Python 9.x +"""Github API Example""" + import os -import ssl import time -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) - -# Time between API refreshes -# 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour -sleep_time = 900 +# Github developer token required. +username = os.getenv("GITHUB_USERNAME") +token = os.getenv("GITHUB_TOKEN") # Get WiFi details, ensure these are setup in settings.toml ssid = os.getenv("CIRCUITPY_WIFI_SSID") password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -# Github developer token required. -github_username = os.getenv("Github_username") -github_token = os.getenv("Github_token") - -if sleep_time < 60: - sleep_time_conversion = "seconds" - sleep_int = sleep_time -elif 60 <= sleep_time < 3600: - sleep_int = sleep_time / 60 - sleep_time_conversion = "minutes" -elif 3600 <= sleep_time < 86400: - sleep_int = sleep_time / 60 / 60 - sleep_time_conversion = "hours" -else: - sleep_int = sleep_time / 60 / 60 / 24 - sleep_time_conversion = "days" - -github_header = {"Authorization": " token " + github_token} -GH_SOURCE = "https://api.github.com/users/" + github_username - -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -requests = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.ipv4_address: - try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) - gc.collect() -print("Connected!\n") + +# API Polling Rate +# 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour +SLEEP_TIME = 900 + +# Set debug to True for full JSON response. +# WARNING: may include visible credentials +DEBUG = False + +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) + +GITHUB_HEADER = {"Authorization": " token " + token} +GITHUB_SOURCE = "https://api.github.com/users/" + username + + +def time_calc(input_time): + """Converts seconds to minutes/hours/days""" + if input_time < 60: + return f"{input_time:.0f} seconds" + if input_time < 3600: + return f"{input_time / 60:.0f} minutes" + if input_time < 86400: + return f"{input_time / 60 / 60:.0f} hours" + return f"{input_time / 60 / 60 / 24:.1f} days" + while True: + # Connect to Wi-Fi + print("\nConnecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") + try: - print("\nAttempting to GET GITHUB Stats!") # -------------------------------- - # Print Request to Serial - debug_request = False # Set true to see full request - if debug_request: - print("Full API GET URL: ", GH_SOURCE) - print("===============================") + print(" | Attempting to GET Github JSON!") try: - github_response = requests.get(url=GH_SOURCE, headers=github_header).json() + github_response = requests.get(url=GITHUB_SOURCE, headers=GITHUB_HEADER) + github_json = github_response.json() except ConnectionError as e: print("Connection Error:", e) print("Retrying in 10 seconds") + print(" | ✅ Github JSON!") + + github_joined = github_json["created_at"] + print(" | | Join Date: ", github_joined) + + github_id = github_json["id"] + print(" | | UserID: ", github_id) + + github_location = github_json["location"] + print(" | | Location: ", github_location) - # Print Response to Serial - debug_response = False # Set true to see full response - if debug_response: - dump_object = json.dumps(github_response) - print("JSON Dump: ", dump_object) + github_name = github_json["name"] + print(" | | Username: ", github_name) - # Print Keys to Serial - gh_debug_keys = True # Set True to print Serial data - if gh_debug_keys: - github_id = github_response["id"] - print("UserID: ", github_id) + github_repos = github_json["public_repos"] + print(" | | Respositores: ", github_repos) - github_username = github_response["name"] - print("Username: ", github_username) + github_followers = github_json["followers"] + print(" | | Followers: ", github_followers) + github_bio = github_json["bio"] + print(" | | Bio: ", github_bio) - github_followers = github_response["followers"] - print("Followers: ", github_followers) + if DEBUG: + print("Full API GET URL: ", GITHUB_SOURCE) + print(github_json) - print("Monotonic: ", time.monotonic()) + github_response.close() + print("✂️ Disconnected from Github API") print("\nFinished!") - print("Next Update in %s %s" % (int(sleep_int), sleep_time_conversion)) + print(f"Board Uptime: {time_calc(time.monotonic())}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") print("===============================") - gc.collect() except (ValueError, RuntimeError) as e: - print("Failed to get data, retrying\n", e) + print(f"Failed to get data, retrying\n {e}") time.sleep(60) - continue - time.sleep(sleep_time) + break + time.sleep(SLEEP_TIME) diff --git a/examples/wifi/expanded/requests_wifi_api_mastodon.py b/examples/wifi/expanded/requests_wifi_api_mastodon.py index f6bce9f..fd4266e 100644 --- a/examples/wifi/expanded/requests_wifi_api_mastodon.py +++ b/examples/wifi/expanded/requests_wifi_api_mastodon.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: MIT # Coded for Circuit Python 8.2.x """Mastodon API Example""" -# pylint: disable=import-error import os import time diff --git a/examples/wifi/expanded/requests_wifi_api_openskynetwork_private_area.py b/examples/wifi/expanded/requests_wifi_api_openskynetwork_private_area.py index 8cd08a2..1652a73 100644 --- a/examples/wifi/expanded/requests_wifi_api_openskynetwork_private_area.py +++ b/examples/wifi/expanded/requests_wifi_api_openskynetwork_private_area.py @@ -1,155 +1,164 @@ -# SPDX-FileCopyrightText: 2023 DJDevon3 +# SPDX-FileCopyrightText: 2024 DJDevon3 # SPDX-License-Identifier: MIT -# Coded for Circuit Python 8.1 -# DJDevon3 ESP32-S3 OpenSkyNetwork_Private_Area_API_Example +# Coded for Circuit Python 8.2.x +"""OpenSky-Network.org Private Area API Example""" -import json +import binascii import os -import ssl import time -import circuitpython_base64 as base64 -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests # OpenSky-Network.org Website Login required for this API +# Increased call limit vs Public. # REST API: https://openskynetwork.github.io/opensky-api/rest.html - # Retrieves all traffic within a geographic area (Orlando example) -latmin = "27.22" # east bounding box -latmax = "28.8" # west bounding box -lonmin = "-81.46" # north bounding box -lonmax = "-80.40" # south bounding box +LATMIN = "27.22" # east bounding box +LATMAX = "28.8" # west bounding box +LONMIN = "-81.46" # north bounding box +LONMAX = "-80.40" # south bounding box -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) +# Get WiFi details, ensure these are setup in settings.toml +ssid = os.getenv("CIRCUITPY_WIFI_SSID") +password = os.getenv("CIRCUITPY_WIFI_PASSWORD") +osnusername = os.getenv("OSN_USERNAME") # Website Credentials +osnpassword = os.getenv("OSN_PASSWORD") # Website Credentials -# Time between API refreshes +# API Polling Rate # 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour # OpenSky-Networks IP bans for too many requests, check rate limit. # https://openskynetwork.github.io/opensky-api/rest.html#limitations -sleep_time = 1800 +SLEEP_TIME = 1800 -# Get WiFi details, ensure these are setup in settings.toml -ssid = os.getenv("CIRCUITPY_WIFI_SSID") -password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -# No token required, only website login -osnu = os.getenv("OSN_Username") -osnp = os.getenv("OSN_Password") - -osn_cred = str(osnu) + ":" + str(osnp) -bytes_to_encode = b" " + str(osn_cred) + " " -base64_string = base64.encodebytes(bytes_to_encode) -base64cred = repr(base64_string)[2:-1] - -Debug_Auth = False # STREAMER WARNING this will show your credentials! -if Debug_Auth: - osn_cred = str(osnu) + ":" + str(osnp) - bytes_to_encode = b" " + str(osn_cred) + " " - print(repr(bytes_to_encode)) - base64_string = base64.encodebytes(bytes_to_encode) - print(repr(base64_string)[2:-1]) - base64cred = repr(base64_string)[2:-1] - print("Decoded Bytes:", str(base64cred)) - -# OSN requires your username:password to be base64 encoded -# so technically it's not transmitted in the clear but w/e -osn_header = {"Authorization": "Basic " + str(base64cred)} - -# Example request of all traffic over Florida, geographic areas cost less per call. +# Set debug to True for full JSON response. +# WARNING: makes credentials visible. based on how many flights +# in your area, full response could crash microcontroller +DEBUG = False + +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) + +# -- Base64 Conversion -- +OSN_CREDENTIALS = str(osnusername) + ":" + str(osnpassword) +# base64 encode and strip appended \n from bytearray +OSN_CREDENTIALS_B = binascii.b2a_base64(OSN_CREDENTIALS.encode()).strip() +BASE64_STRING = OSN_CREDENTIALS_B.decode() # bytearray + +if DEBUG: + print("Base64 ByteArray: ", BASE64_STRING) + +# Area requires OpenSky-Network.org username:password to be base64 encoded +OSN_HEADER = {"Authorization": "Basic " + BASE64_STRING} + +# Example request of all traffic over Florida. +# Geographic areas calls cost less against the limit. # https://opensky-network.org/api/states/all?lamin=25.21&lomin=-84.36&lamax=30.0&lomax=-78.40 OPENSKY_SOURCE = ( "https://opensky-network.org/api/states/all?" + "lamin=" - + latmin + + LATMIN + "&lomin=" - + lonmin + + LONMIN + "&lamax=" - + latmax + + LATMAX + "&lomax=" - + lonmax + + LONMAX ) -# Converts seconds to human readable minutes/hours/days -def time_calc(input_time): # input_time in seconds +def time_calc(input_time): + """Converts seconds to minutes/hours/days""" if input_time < 60: - sleep_int = input_time - time_output = f"{sleep_int:.0f} seconds" - elif 60 <= input_time < 3600: - sleep_int = input_time / 60 - time_output = f"{sleep_int:.0f} minutes" - elif 3600 <= input_time < 86400: - sleep_int = input_time / 60 / 60 - time_output = f"{sleep_int:.1f} hours" - else: - sleep_int = input_time / 60 / 60 / 24 - time_output = f"{sleep_int:.1f} days" - return time_output + return f"{input_time:.0f} seconds" + if input_time < 3600: + return f"{input_time / 60:.0f} minutes" + if input_time < 86400: + return f"{input_time / 60 / 60:.0f} hours" + return f"{input_time / 60 / 60 / 24:.1f} days" def _format_datetime(datetime): - return "{:02}/{:02}/{} {:02}:{:02}:{:02}".format( - datetime.tm_mon, - datetime.tm_mday, - datetime.tm_year, - datetime.tm_hour, - datetime.tm_min, - datetime.tm_sec, + """F-String formatted struct time conversion""" + return ( + f"{datetime.tm_mon:02}/" + + f"{datetime.tm_mday:02}/" + + f"{datetime.tm_year:02} " + + f"{datetime.tm_hour:02}:" + + f"{datetime.tm_min:02}:" + + f"{datetime.tm_sec:02}" ) -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -request = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.ipv4_address: +while True: + # Connect to Wi-Fi + print("\nConnecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") + try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) -print("Connected!\n") + print(" | Attempting to GET OpenSky-Network Area Flights JSON!") + try: + opensky_response = requests.get(url=OPENSKY_SOURCE, headers=OSN_HEADER) + opensky_json = opensky_response.json() + except ConnectionError as e: + print("Connection Error:", e) + print("Retrying in 10 seconds") -while True: - # STREAMER WARNING this will show your credentials! - debug_request = False # Set True to see full request - if debug_request: - print("Full API HEADER: ", str(osn_header)) - print("Full API GET URL: ", OPENSKY_SOURCE) - print("===============================") + print(" | ✅ OpenSky-Network JSON!") - print("\nAttempting to GET OpenSky-Network Data!") - opensky_response = request.get(url=OPENSKY_SOURCE, headers=osn_header).json() + if DEBUG: + print("Full API GET URL: ", OPENSKY_SOURCE) + print(opensky_json) - # Print Full JSON to Serial (doesn't show credentials) - debug_response = False # Set True to see full response - if debug_response: - dump_object = json.dumps(opensky_response) - print("JSON Dump: ", dump_object) + # ERROR MESSAGE RESPONSES + if "timestamp" in opensky_json: + osn_timestamp = opensky_json["timestamp"] + print(f"❌ Timestamp: {osn_timestamp}") - # Key:Value Serial Debug (doesn't show credentials) - osn_debug_keys = True # Set True to print Serial data - if osn_debug_keys: - try: - osn_flight = opensky_response["time"] - print("Current Unix Time: ", osn_flight) + if "message" in opensky_json: + osn_message = opensky_json["message"] + print(f"❌ Message: {osn_message}") + + if "error" in opensky_json: + osn_error = opensky_json["error"] + print(f"❌ Error: {osn_error}") + + if "path" in opensky_json: + osn_path = opensky_json["path"] + print(f"❌ Path: {osn_path}") - current_struct_time = time.localtime(osn_flight) - current_date = "{}".format(_format_datetime(current_struct_time)) - print(f"Unix to Readable Time: {current_date}") + if "status" in opensky_json: + osn_status = opensky_json["status"] + print(f"❌ Status: {osn_status}") - # Current flight data for single callsign (right now) - osn_all_flights = opensky_response["states"] + # Current flight data for single callsign (right now) + osn_all_flights = opensky_json["states"] + + if osn_all_flights is not None: + if DEBUG: + print(f" | | Area Flights Full Response: {osn_all_flights}") + + osn_time = opensky_json["time"] + # print(f" | | Last Contact Unix Time: {osn_time}") + osn_struct_time = time.localtime(osn_time) + osn_readable_time = f"{_format_datetime(osn_struct_time)}" + print(f" | | Timestamp: {osn_readable_time}") if osn_all_flights is not None: # print("Flight Data: ", osn_all_flights) for flights in osn_all_flights: - osn_t = f"Trans:{flights[0]} " - osn_c = f"Sign:{flights[1]} " + osn_t = f" | | Trans:{flights[0]} " + osn_c = f"Sign:{flights[1]}" osn_o = f"Origin:{flights[2]} " osn_tm = f"Time:{flights[3]} " osn_l = f"Last:{flights[4]} " @@ -171,16 +180,20 @@ def _format_datetime(datetime): string2 = f"{osn_la}{osn_ba}{osn_g}{osn_v}{osn_h}{osn_vr}" string3 = f"{osn_s}{osn_ga}{osn_sq}{osn_pr}{osn_ps}{osn_ca}" print(f"{string1}{string2}{string3}") - else: - print("Flight has no active data or you're polling too fast.") - - print("\nFinished!") - print("Board Uptime: ", time_calc(time.monotonic())) - print("Next Update: ", time_calc(sleep_time)) - time.sleep(sleep_time) - print("===============================") - - except (ConnectionError, ValueError, NameError) as e: - print("OSN Connection Error:", e) - print("Next Retry: ", time_calc(sleep_time)) - time.sleep(sleep_time) + + else: + print(" | | ❌ Area has no active data or you're polling too fast.") + + opensky_response.close() + print("✂️ Disconnected from OpenSky-Network API") + + print("\nFinished!") + print(f"Board Uptime: {time_calc(time.monotonic())}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") + print("===============================") + + except (ValueError, RuntimeError) as e: + print(f"Failed to get data, retrying\n {e}") + time.sleep(60) + break + time.sleep(SLEEP_TIME) diff --git a/examples/wifi/expanded/requests_wifi_api_premiereleague.py b/examples/wifi/expanded/requests_wifi_api_premiereleague.py new file mode 100644 index 0000000..88524e9 --- /dev/null +++ b/examples/wifi/expanded/requests_wifi_api_premiereleague.py @@ -0,0 +1,85 @@ +# SPDX-FileCopyrightText: 2024 DJDevon3 +# SPDX-License-Identifier: MIT +# Coded for Circuit Python 8.2.x +"""Premiere League Total Players API Example""" + +import os +import time + +import adafruit_connection_manager +import adafruit_json_stream as json_stream +import wifi + +import adafruit_requests + +# Public API. No user or token required + +# Get WiFi details, ensure these are setup in settings.toml +ssid = os.getenv("CIRCUITPY_WIFI_SSID") +password = os.getenv("CIRCUITPY_WIFI_PASSWORD") + +# API Polling Rate +# 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour +SLEEP_TIME = 900 + +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) + +# Publicly available data no header required +PREMIERE_LEAGUE_SOURCE = "https://fantasy.premierleague.com/api/bootstrap-static/" + + +def time_calc(input_time): + """Converts seconds to minutes/hours/days""" + if input_time < 60: + return f"{input_time:.0f} seconds" + if input_time < 3600: + return f"{input_time / 60:.0f} minutes" + if input_time < 86400: + return f"{input_time / 60 / 60:.0f} hours" + return f"{input_time / 60 / 60 / 24:.1f} days" + + +while True: + # Connect to Wi-Fi + print("\nConnecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") + + try: + print(" | Attempting to GET Premiere League JSON!") + + # Set debug to True for full JSON response. + # WARNING: may include visible credentials + # MICROCONTROLLER WARNING: might crash by returning too much data + DEBUG_RESPONSE = False + + try: + PREMIERE_LEAGUE_RESPONSE = requests.get(url=PREMIERE_LEAGUE_SOURCE) + pl_json = json_stream.load(PREMIERE_LEAGUE_RESPONSE.iter_content(32)) + except ConnectionError as e: + print(f"Connection Error: {e}") + print("Retrying in 10 seconds") + print(" | ✅ Premiere League JSON!") + + print(f" | Total Premiere League Players: {pl_json['total_players']}") + PREMIERE_LEAGUE_RESPONSE.close() + print("✂️ Disconnected from Premiere League") + + print("\nFinished!") + print(f"Board Uptime: {time.monotonic()}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") + print("===============================") + + except (ValueError, RuntimeError) as e: + print(f"Failed to get data, retrying\n {e}") + time.sleep(60) + break + time.sleep(SLEEP_TIME) diff --git a/examples/wifi/expanded/requests_wifi_api_rocketlaunch_live.py b/examples/wifi/expanded/requests_wifi_api_rocketlaunch_live.py new file mode 100644 index 0000000..7cf9d61 --- /dev/null +++ b/examples/wifi/expanded/requests_wifi_api_rocketlaunch_live.py @@ -0,0 +1,135 @@ +# SPDX-FileCopyrightText: 2024 DJDevon3 +# SPDX-License-Identifier: MIT +# Coded for Circuit Python 9.0 +"""RocketLaunch.Live API Example""" + +import os +import time + +import adafruit_connection_manager +import wifi + +import adafruit_requests + +# Time between API refreshes +# 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour +SLEEP_TIME = 43200 + +# Get WiFi details, ensure these are setup in settings.toml +ssid = os.getenv("CIRCUITPY_WIFI_SSID") +password = os.getenv("CIRCUITPY_WIFI_PASSWORD") + + +def time_calc(input_time): + """Converts seconds to minutes/hours/days""" + if input_time < 60: + return f"{input_time:.0f} seconds" + if input_time < 3600: + return f"{input_time / 60:.0f} minutes" + if input_time < 86400: + return f"{input_time / 60 / 60:.0f} hours" + return f"{input_time / 60 / 60 / 24:.1f} days" + + +# Publicly available data no header required +# The number at the end is the amount of launches (max 5 free api) +ROCKETLAUNCH_SOURCE = "https://fdo.rocketlaunch.live/json/launches/next/1" + +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) + +while True: + # Connect to Wi-Fi + print("\n===============================") + print("Connecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") + try: + # Print Request to Serial + print(" | Attempting to GET RocketLaunch.Live JSON!") + time.sleep(2) + debug_rocketlaunch_full_response = False + + try: + rocketlaunch_response = requests.get(url=ROCKETLAUNCH_SOURCE) + rocketlaunch_json = rocketlaunch_response.json() + except ConnectionError as e: + print("Connection Error:", e) + print("Retrying in 10 seconds") + print(" | ✅ RocketLaunch.Live JSON!") + + if debug_rocketlaunch_full_response: + print("Full API GET URL: ", ROCKETLAUNCH_SOURCE) + print(rocketlaunch_json) + + # JSON Endpoints + RLFN = str(rocketlaunch_json["result"][0]["name"]) + RLWO = str(rocketlaunch_json["result"][0]["win_open"]) + TZERO = str(rocketlaunch_json["result"][0]["t0"]) + RLWC = str(rocketlaunch_json["result"][0]["win_close"]) + RLP = str(rocketlaunch_json["result"][0]["provider"]["name"]) + RLVN = str(rocketlaunch_json["result"][0]["vehicle"]["name"]) + RLPN = str(rocketlaunch_json["result"][0]["pad"]["name"]) + RLLS = str(rocketlaunch_json["result"][0]["pad"]["location"]["name"]) + RLLD = str(rocketlaunch_json["result"][0]["launch_description"]) + RLM = str(rocketlaunch_json["result"][0]["mission_description"]) + RLDATE = str(rocketlaunch_json["result"][0]["date_str"]) + + # Print to serial & display label if endpoint not "None" + if RLDATE != "None": + print(f" | | Date: {RLDATE}") + if RLFN != "None": + print(f" | | Flight: {RLFN}") + if RLP != "None": + print(f" | | Provider: {RLP}") + if RLVN != "None": + print(f" | | Vehicle: {RLVN}") + + # Launch time can sometimes be Window Open to Close, T-Zero, or weird combination. + # Should obviously be standardized but they're not input that way. + # Have to account for every combination of 3 conditions. + # T-Zero Launch Time Conditionals + if RLWO == "None" and TZERO != "None" and RLWC != "None": + print(f" | | Window: {TZERO} | {RLWC}") + elif RLWO != "None" and TZERO != "None" and RLWC == "None": + print(f" | | Window: {RLWO} | {TZERO}") + elif RLWO != "None" and TZERO == "None" and RLWC != "None": + print(f" | | Window: {RLWO} | {RLWC}") + elif RLWO != "None" and TZERO != "None" and RLWC != "None": + print(f" | | Window: {RLWO} | {TZERO} | {RLWC}") + elif RLWO == "None" and TZERO != "None" and RLWC == "None": + print(f" | | Window: {TZERO}") + elif RLWO != "None" and TZERO == "None" and RLWC == "None": + print(f" | | Window: {RLWO}") + elif RLWO == "None" and TZERO == "None" and RLWC != "None": + print(f" | | Window: {RLWC}") + + if RLLS != "None": + print(f" | | Site: {RLLS}") + if RLPN != "None": + print(f" | | Pad: {RLPN}") + if RLLD != "None": + print(f" | | Description: {RLLD}") + if RLM != "None": + print(f" | | Mission: {RLM}") + + rocketlaunch_response.close() + print("✂️ Disconnected from RocketLaunch.Live API") + + print("\nFinished!") + print(f"Board Uptime: {time_calc(time.monotonic())}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") + print("===============================") + + except (ValueError, RuntimeError) as e: + print("Failed to get data, retrying\n", e) + time.sleep(60) + break + time.sleep(SLEEP_TIME) diff --git a/examples/wifi/expanded/requests_wifi_api_steam.py b/examples/wifi/expanded/requests_wifi_api_steam.py index 208b6d6..fb1f7bd 100644 --- a/examples/wifi/expanded/requests_wifi_api_steam.py +++ b/examples/wifi/expanded/requests_wifi_api_steam.py @@ -1,41 +1,47 @@ -# SPDX-FileCopyrightText: 2022 DJDevon3 (Neradoc & Deshipu helped) for Adafruit Industries +# SPDX-FileCopyrightText: 2024 DJDevon3 # SPDX-License-Identifier: MIT -# Coded for Circuit Python 8.0 -"""DJDevon3 Adafruit Feather ESP32-S2 api_steam Example""" -import gc -import json +# Coded for Circuit Python 8.2.x +"""Steam API Get Owned Games Example""" + import os -import ssl import time -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests # Steam API Docs: https://steamcommunity.com/dev # Steam API Key: https://steamcommunity.com/dev/apikey -# Steam Usernumber: Visit https://steamcommunity.com -# click on your profile icon, your usernumber will be in the browser url. +# Numerical Steam ID: Visit https://store.steampowered.com/account/ +# Your account name will be in big bold letters. +# Your numerical STEAM ID will be below in a very small font. # Get WiFi details, ensure these are setup in settings.toml ssid = os.getenv("CIRCUITPY_WIFI_SSID") password = os.getenv("CIRCUITPY_WIFI_PASSWORD") # Requires Steam Developer API key -steam_usernumber = os.getenv("steam_id") -steam_apikey = os.getenv("steam_api_key") - -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) +steam_usernumber = os.getenv("STEAM_ID") +steam_apikey = os.getenv("STEAM_API_KEY") -# Time between API refreshes +# API Polling Rate # 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour -sleep_time = 900 +SLEEP_TIME = 3600 + +# Set debug to True for full JSON response. +# WARNING: Steam's full response will overload most microcontrollers +# SET TO TRUE IF YOU FEEL BRAVE =) +DEBUG = False + +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) # Deconstruct URL (pylint hates long lines) # http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/ # ?key=XXXXXXXXXXXXXXXXXXXXX&include_played_free_games=1&steamid=XXXXXXXXXXXXXXXX&format=json -Steam_OwnedGames_URL = ( +STEAM_SOURCE = ( "http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?" + "key=" + steam_apikey @@ -45,75 +51,79 @@ + "&format=json" ) -if sleep_time < 60: - sleep_time_conversion = "seconds" - sleep_int = sleep_time -elif 60 <= sleep_time < 3600: - sleep_int = sleep_time / 60 - sleep_time_conversion = "minutes" -elif 3600 <= sleep_time < 86400: - sleep_int = sleep_time / 60 / 60 - sleep_time_conversion = "hours" -else: - sleep_int = sleep_time / 60 / 60 / 24 - sleep_time_conversion = "days" - -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -requests = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.ipv4_address: - try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) - gc.collect() -print("Connected!\n") + +def time_calc(input_time): + """Converts seconds to minutes/hours/days""" + if input_time < 60: + return f"{input_time:.0f} seconds" + if input_time < 3600: + return f"{input_time / 60:.0f} minutes" + if input_time < 86400: + return f"{input_time / 60 / 60:.0f} hours" + return f"{input_time / 60 / 60 / 24:.1f} days" + + +def _format_datetime(datetime): + """F-String formatted struct time conversion""" + return ( + f"{datetime.tm_mon:02}/" + + f"{datetime.tm_mday:02}/" + + f"{datetime.tm_year:02} " + + f"{datetime.tm_hour:02}:" + + f"{datetime.tm_min:02}:" + + f"{datetime.tm_sec:02}" + ) + while True: + # Connect to Wi-Fi + print("\nConnecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") + try: - print("\nAttempting to GET STEAM Stats!") # -------------------------------- - # Print Request to Serial - debug_request = False # Set true to see full request - if debug_request: - print("Full API GET URL: ", Steam_OwnedGames_URL) - print("===============================") + print(" | Attempting to GET Steam API JSON!") try: - steam_response = requests.get(url=Steam_OwnedGames_URL).json() + steam_response = requests.get(url=STEAM_SOURCE) + steam_json = steam_response.json() except ConnectionError as e: print("Connection Error:", e) print("Retrying in 10 seconds") - # Print Response to Serial - debug_response = False # Set true to see full response - if debug_response: - dump_object = json.dumps(steam_response) - print("JSON Dump: ", dump_object) - - # Print Keys to Serial - steam_debug_keys = True # Set True to print Serial data - if steam_debug_keys: - game_count = steam_response["response"]["game_count"] - print("Total Games: ", game_count) - total_minutes = 0 - - for game in steam_response["response"]["games"]: - total_minutes += game["playtime_forever"] - total_hours = total_minutes / 60 - total_days = total_minutes / 60 / 24 - print(f"Total Hours: {total_hours}") - print(f"Total Days: {total_days}") - - print("Monotonic: ", time.monotonic()) + print(" | ✅ Steam JSON!") + + if DEBUG: + print("Full API GET URL: ", STEAM_SOURCE) + print(steam_json) + + game_count = steam_json["response"]["game_count"] + print(f" | | Total Games: {game_count}") + TOTAL_MINUTES = 0 + + for game in steam_json["response"]["games"]: + TOTAL_MINUTES += game["playtime_forever"] + total_hours = TOTAL_MINUTES / 60 + total_days = TOTAL_MINUTES / 60 / 24 + total_years = TOTAL_MINUTES / 60 / 24 / 365 + print(f" | | Total Hours: {total_hours}") + print(f" | | Total Days: {total_days}") + print(f" | | Total Years: {total_years:.2f}") + + steam_response.close() + print("✂️ Disconnected from Steam API") + print("\nFinished!") - print("Next Update in %s %s" % (int(sleep_int), sleep_time_conversion)) + print(f"Board Uptime: {time_calc(time.monotonic())}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") print("===============================") - gc.collect() except (ValueError, RuntimeError) as e: - print("Failed to get data, retrying\n", e) + print(f"Failed to get data, retrying\n {e}") time.sleep(60) - continue - time.sleep(sleep_time) + break + time.sleep(SLEEP_TIME) diff --git a/examples/wifi/expanded/requests_wifi_api_twitch.py b/examples/wifi/expanded/requests_wifi_api_twitch.py index 716caa9..1f02e5c 100644 --- a/examples/wifi/expanded/requests_wifi_api_twitch.py +++ b/examples/wifi/expanded/requests_wifi_api_twitch.py @@ -1,44 +1,48 @@ -# SPDX-FileCopyrightText: 2023 DJDevon3 +# SPDX-FileCopyrightText: 2024 DJDevon3 # SPDX-License-Identifier: MIT # Coded for Circuit Python 8.2.x -# Twitch_API_Example +"""Twitch API Example""" import os -import ssl import time -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) - # Twitch Developer Account & oauth App Required: # Visit https://dev.twitch.tv/console to create an app - -# Ensure these are in secrets.py or settings.toml -# "Twitch_ClientID": "Your Developer APP ID Here", -# "Twitch_Client_Secret": "APP ID secret here", -# "Twitch_UserID": "Your Twitch UserID here", +# Ensure these are in settings.toml +# TWITCH_CLIENT_ID = "Your Developer APP ID Here" +# TWITCH_CLIENT_SECRET = "APP ID secret here" +# TWITCH_USER_ID = "Your Twitch UserID here" # Get WiFi details, ensure these are setup in settings.toml ssid = os.getenv("CIRCUITPY_WIFI_SSID") password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -twitch_client_id = os.getenv("Twitch_ClientID") -twitch_client_secret = os.getenv("Twitch_Client_Secret") +TWITCH_CID = os.getenv("TWITCH_CLIENT_ID") +TWITCH_CS = os.getenv("TWITCH_CLIENT_SECRET") # For finding your Twitch User ID # https://www.streamweasels.com/tools/convert-twitch-username-to-user-id/ -twitch_user_id = os.getenv("Twitch_UserID") # User ID you want endpoints from +TWITCH_UID = os.getenv("TWITCH_USER_ID") -# Time between API refreshes +# API Polling Rate # 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour -sleep_time = 900 +SLEEP_TIME = 900 + +# Set DEBUG to True for full JSON response. +# STREAMER WARNING: Credentials will be viewable +DEBUG = False + +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) -# Converts seconds to minutes/hours/days def time_calc(input_time): + """Converts seconds to minutes/hours/days""" if input_time < 60: return f"{input_time:.0f} seconds" if input_time < 3600: @@ -48,42 +52,44 @@ def time_calc(input_time): return f"{input_time / 60 / 60 / 24:.1f} days" +def _format_datetime(datetime): + """F-String formatted struct time conversion""" + return ( + f"{datetime.tm_mon:02}/" + + f"{datetime.tm_mday:02}/" + + f"{datetime.tm_year:02} " + + f"{datetime.tm_hour:02}:" + + f"{datetime.tm_min:02}:" + + f"{datetime.tm_sec:02}" + ) + + # First we use Client ID & Client Secret to create a token with POST # No user interaction is required for this type of scope (implicit grant flow) twitch_0auth_header = {"Content-Type": "application/x-www-form-urlencoded"} TWITCH_0AUTH_TOKEN = "https://id.twitch.tv/oauth2/token" -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -requests = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.connected: - try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) -print("Connected!\n") - while True: + # Connect to Wi-Fi + print("\nConnecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") + try: - # ----------------------------- POST FOR BEARER TOKEN ----------------------- - print( - "Attempting Bearer Token Request!" - ) # --------------------------------------- - # Print Request to Serial - debug_bearer_request = ( - False # STREAMER WARNING: your client secret will be viewable - ) - if debug_bearer_request: - print("Full API GET URL: ", TWITCH_0AUTH_TOKEN) - print("===============================") + # ------------- POST FOR BEARER TOKEN ----------------- + print(" | Attempting Bearer Token Request!") + if DEBUG: + print(f"Full API GET URL: {TWITCH_0AUTH_TOKEN}") twitch_0auth_data = ( "&client_id=" - + twitch_client_id + + TWITCH_CID + "&client_secret=" - + twitch_client_secret + + TWITCH_CS + "&grant_type=client_credentials" ) @@ -95,70 +101,79 @@ def time_calc(input_time): twitch_0auth_json = twitch_0auth_response.json() twitch_access_token = twitch_0auth_json["access_token"] except ConnectionError as e: - print("Connection Error:", e) + print(f"Connection Error: {e}") print("Retrying in 10 seconds") + print(" | 🔑 Token Authorized!") - # Print Response to Serial - debug_bearer_response = ( - False # STREAMER WARNING: your client secret will be viewable - ) - if debug_bearer_response: - print("JSON Dump: ", twitch_0auth_json) - print("Header: ", twitch_0auth_header) - print("Access Token: ", twitch_access_token) + # STREAMER WARNING: your client secret will be viewable + if DEBUG: + print(f"JSON Dump: {twitch_0auth_json}") + print(f"Header: {twitch_0auth_header}") + print(f"Access Token: {twitch_access_token}") twitch_token_type = twitch_0auth_json["token_type"] - print("Token Type: ", twitch_token_type) + print(f"Token Type: {twitch_token_type}") - print("Board Uptime: ", time_calc(time.monotonic())) twitch_token_expiration = twitch_0auth_json["expires_in"] - print("Token Expires in: ", time_calc(twitch_token_expiration)) + print(f" | Token Expires in: {time_calc(twitch_token_expiration)}") - # ----------------------------- GET DATA ------------------------------------- + # ----------------------------- GET DATA -------------------- # Bearer token is refreshed every time script runs :) # Twitch sets token expiration to about 64 days # Helix is the name of the current Twitch API # Now that we have POST bearer token we can do a GET for data - # ---------------------------------------------------------------------------- + # ----------------------------------------------------------- twitch_header = { "Authorization": "Bearer " + twitch_access_token + "", - "Client-Id": "" + twitch_client_id + "", + "Client-Id": "" + TWITCH_CID + "", } TWITCH_FOLLOWERS_SOURCE = ( "https://api.twitch.tv/helix/channels" + "/followers?" + "broadcaster_id=" - + twitch_user_id + + TWITCH_UID ) - print( - "\nAttempting to GET TWITCH Stats!" - ) # ------------------------------------------------ - print("===============================") - twitch_followers_response = requests.get( + print(" | Attempting to GET Twitch JSON!") + twitch_response = requests.get( url=TWITCH_FOLLOWERS_SOURCE, headers=twitch_header ) try: - twitch_followers_json = twitch_followers_response.json() + twitch_json = twitch_response.json() except ConnectionError as e: - print("Connection Error:", e) + print(f"Connection Error: {e}") print("Retrying in 10 seconds") - # Print Response to Serial - debug_bearer_response = ( - False # STREAMER WARNING: your bearer token will be viewable - ) - if debug_bearer_response: - print("Full API GET URL: ", TWITCH_FOLLOWERS_SOURCE) - print("Header: ", twitch_header) - print("JSON Full Response: ", twitch_followers_json) - - twitch_followers = twitch_followers_json["total"] - print("Followers: ", twitch_followers) - print("Finished!") - print("Next Update in: ", time_calc(sleep_time)) + if DEBUG: + print(f" | Full API GET URL: {TWITCH_FOLLOWERS_SOURCE}") + print(f" | Header: {twitch_header}") + print(f" | JSON Full Response: {twitch_json}") + + if "status" in twitch_json: + twitch_error_status = twitch_json["status"] + print(f"❌ Status: {twitch_error_status}") + + if "error" in twitch_json: + twitch_error = twitch_json["error"] + print(f"❌ Error: {twitch_error}") + + if "message" in twitch_json: + twitch_error_msg = twitch_json["message"] + print(f"❌ Message: {twitch_error_msg}") + + if "total" in twitch_json: + print(" | ✅ Twitch JSON!") + twitch_followers = twitch_json["total"] + print(f" | | Followers: {twitch_followers}") + + twitch_response.close() + print("✂️ Disconnected from Twitch API") + + print("\nFinished!") + print(f"Board Uptime: {time_calc(time.monotonic())}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") print("===============================") except (ValueError, RuntimeError) as e: - print("Failed to get data, retrying\n", e) + print(f"Failed to get data, retrying\n {e}") time.sleep(60) - continue - time.sleep(sleep_time) + break + time.sleep(SLEEP_TIME) diff --git a/examples/wifi/expanded/requests_wifi_api_twitter.py b/examples/wifi/expanded/requests_wifi_api_twitter.py deleted file mode 100644 index 4dcdfa6..0000000 --- a/examples/wifi/expanded/requests_wifi_api_twitter.py +++ /dev/null @@ -1,120 +0,0 @@ -# SPDX-FileCopyrightText: 2022 DJDevon3 for Adafruit Industries -# SPDX-License-Identifier: MIT -# Coded for Circuit Python 8.0 -"""DJDevon3 Adafruit Feather ESP32-S2 Twitter_API_Example""" -import gc -import json -import os -import ssl -import time - -import socketpool -import wifi - -import adafruit_requests - -# Twitter developer account bearer token required. -# Ensure these are uncommented and in secrets.py or .env -# "TW_userid": "Your Twitter user id", # numerical id not username -# "TW_bearer_token": "Your long API Bearer token", - -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) - -# Time between API refreshes -# 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour -sleep_time = 900 - -# Get WiFi details, ensure these are setup in settings.toml -ssid = os.getenv("CIRCUITPY_WIFI_SSID") -password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -tw_userid = os.getenv("TW_userid") -tw_bearer_token = os.getenv("TW_bearer_token") - -if sleep_time < 60: - sleep_time_conversion = "seconds" - sleep_int = sleep_time -elif 60 <= sleep_time < 3600: - sleep_int = sleep_time / 60 - sleep_time_conversion = "minutes" -elif 3600 <= sleep_time < 86400: - sleep_int = sleep_time / 60 / 60 - sleep_time_conversion = "hours" -else: - sleep_int = sleep_time / 60 / 60 / 24 - sleep_time_conversion = "days" - -# Used with any Twitter 0auth request. -twitter_header = {"Authorization": "Bearer " + tw_bearer_token} -TW_SOURCE = ( - "https://api.twitter.com/2/users/" - + tw_userid - + "?user.fields=public_metrics,created_at,pinned_tweet_id" - + "&expansions=pinned_tweet_id" - + "&tweet.fields=created_at,public_metrics,source,context_annotations,entities" -) - -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -requests = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.ipv4_address: - try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) - gc.collect() -print("Connected!\n") - -while True: - try: - print("\nAttempting to GET Twitter Stats!") # -------------------------------- - debug_request = False # Set true to see full request - if debug_request: - print("Full API GET URL: ", TW_SOURCE) - print("===============================") - try: - twitter_response = requests.get(url=TW_SOURCE, headers=twitter_header) - tw_json = twitter_response.json() - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - - # Print Full JSON to Serial - debug_response = False # Set true to see full response - if debug_response: - dump_object = json.dumps(tw_json) - print("JSON Dump: ", dump_object) - - # Print to Serial - tw_debug_keys = True # Set true to print Serial data - if tw_debug_keys: - tw_userid = tw_json["data"]["id"] - print("User ID: ", tw_userid) - - tw_username = tw_json["data"]["name"] - print("Name: ", tw_username) - - tw_join_date = tw_json["data"]["created_at"] - print("Member Since: ", tw_join_date) - - tw_tweets = tw_json["data"]["public_metrics"]["tweet_count"] - print("Tweets: ", tw_tweets) - - tw_followers = tw_json["data"]["public_metrics"]["followers_count"] - print("Followers: ", tw_followers) - - print("Monotonic: ", time.monotonic()) - - print("\nFinished!") - print("Next Update in %s %s" % (int(sleep_int), sleep_time_conversion)) - print("===============================") - gc.collect() - - except (ValueError, RuntimeError) as e: - print("Failed to get data, retrying\n", e) - time.sleep(60) - continue - time.sleep(sleep_time) diff --git a/examples/wifi/expanded/requests_wifi_api_youtube.py b/examples/wifi/expanded/requests_wifi_api_youtube.py index e9bc6a2..c69c146 100644 --- a/examples/wifi/expanded/requests_wifi_api_youtube.py +++ b/examples/wifi/expanded/requests_wifi_api_youtube.py @@ -1,123 +1,120 @@ -# SPDX-FileCopyrightText: 2022 DJDevon3 for Adafruit Industries +# SPDX-FileCopyrightText: 2024 DJDevon3 # SPDX-License-Identifier: MIT -# Coded for Circuit Python 8.0 -"""DJDevon3 Adafruit Feather ESP32-S2 YouTube_API_Example""" -import gc -import json +# Coded for Circuit Python 8.2.x +"""YouTube API Subscriber Count Example""" + import os -import ssl import time -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests -# Ensure these are uncommented and in secrets.py or .env -# "YT_username": "Your YouTube Username", -# "YT_token" : "Your long API developer token", - -# Initialize WiFi Pool (There can be only 1 pool & top of script) -pool = socketpool.SocketPool(wifi.radio) +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) -# Time between API refreshes +# API Polling Rate # 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour -sleep_time = 900 +SLEEP_TIME = 900 + +# Set debug to True for full JSON response. +# WARNING: Will show credentials +DEBUG = False + +# Ensure these are uncommented and in settings.toml +# YOUTUBE_USERNAME = "Your YouTube Username", +# YOUTUBE_TOKEN = "Your long API developer token", # Get WiFi details, ensure these are setup in settings.toml ssid = os.getenv("CIRCUITPY_WIFI_SSID") password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -yt_username = os.getenv("YT_username") -yt_token = os.getenv("YT_token") - - -if sleep_time < 60: - sleep_time_conversion = "seconds" - sleep_int = sleep_time -elif 60 <= sleep_time < 3600: - sleep_int = sleep_time / 60 - sleep_time_conversion = "minutes" -elif 3600 <= sleep_time < 86400: - sleep_int = sleep_time / 60 / 60 - sleep_time_conversion = "hours" -else: - sleep_int = sleep_time / 60 / 60 / 24 - sleep_time_conversion = "days" +# Requires YouTube/Google API key +# https://console.cloud.google.com/apis/dashboard +YT_USERNAME = os.getenv("YOUTUBE_USERNAME") +YT_TOKEN = os.getenv("YOUTUBE_TOKEN") + + +def time_calc(input_time): + """Converts seconds to minutes/hours/days""" + if input_time < 60: + return f"{input_time:.0f} seconds" + if input_time < 3600: + return f"{input_time / 60:.0f} minutes" + if input_time < 86400: + return f"{input_time / 60 / 60:.0f} hours" + return f"{input_time / 60 / 60 / 24:.1f} days" + # https://youtube.googleapis.com/youtube/v3/channels?part=statistics&forUsername=[YOUR_USERNAME]&key=[YOUR_API_KEY] -YT_SOURCE = ( - "https://youtube.googleapis.com/youtube/v3/channels?" - + "part=statistics" - + "&forUsername=" - + yt_username +YOUTUBE_SOURCE = ( + "https://youtube.googleapis.com/youtube/v3/channels?part=statistics&forUsername=" + + str(YT_USERNAME) + "&key=" - + yt_token + + str(YT_TOKEN) ) -# Connect to Wi-Fi -print("\n===============================") -print("Connecting to WiFi...") -requests = adafruit_requests.Session(pool, ssl.create_default_context()) -while not wifi.radio.ipv4_address: - try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("Connection Error:", e) - print("Retrying in 10 seconds") - time.sleep(10) - gc.collect() -print("Connected!\n") - while True: + # Connect to Wi-Fi + print("\nConnecting to WiFi...") + while not wifi.radio.ipv4_address: + try: + wifi.radio.connect(ssid, password) + except ConnectionError as e: + print("❌ Connection Error:", e) + print("Retrying in 10 seconds") + print("✅ Wifi!") try: - print("Attempting to GET YouTube Stats!") # ---------------------------------- - debug_request = False # Set true to see full request - if debug_request: - print("Full API GET URL: ", YT_SOURCE) - print("===============================") + print(" | Attempting to GET YouTube JSON...") try: - response = requests.get(YT_SOURCE).json() + youtube_response = requests.get(url=YOUTUBE_SOURCE) + youtube_json = youtube_response.json() except ConnectionError as e: print("Connection Error:", e) print("Retrying in 10 seconds") + print(" | ✅ YouTube JSON!") + + if DEBUG: + print(f" | Full API GET URL: {YOUTUBE_SOURCE}") + print(f" | Full API Dump: {youtube_json}") - # Print Full JSON to Serial - debug_response = False # Set true to see full response - if debug_response: - dump_object = json.dumps(response) - print("JSON Dump: ", dump_object) + # Key:Value RESPONSES + if "pageInfo" in youtube_json: + totalResults = youtube_json["pageInfo"]["totalResults"] + print(f" | | Matching Results: {totalResults}") - # Print to Serial - yt_debug_keys = True # Set to True to print Serial data - if yt_debug_keys: - print("Matching Results: ", response["pageInfo"]["totalResults"]) + if "items" in youtube_json: + YT_request_kind = youtube_json["items"][0]["kind"] + print(f" | | Request Kind: {YT_request_kind}") - YT_request_kind = response["items"][0]["kind"] - print("Request Kind: ", YT_request_kind) + YT_channel_id = youtube_json["items"][0]["id"] + print(f" | | Channel ID: {YT_channel_id}") - YT_response_kind = response["kind"] - print("Response Kind: ", YT_response_kind) + YT_videoCount = youtube_json["items"][0]["statistics"]["videoCount"] + print(f" | | Videos: {YT_videoCount}") - YT_channel_id = response["items"][0]["id"] - print("Channel ID: ", YT_channel_id) + YT_viewCount = youtube_json["items"][0]["statistics"]["viewCount"] + print(f" | | Views: {YT_viewCount}") - YT_videoCount = response["items"][0]["statistics"]["videoCount"] - print("Videos: ", YT_videoCount) + YT_subsCount = youtube_json["items"][0]["statistics"]["subscriberCount"] + print(f" | | Subscribers: {YT_subsCount}") - YT_viewCount = response["items"][0]["statistics"]["viewCount"] - print("Views: ", YT_viewCount) + if "kind" in youtube_json: + YT_response_kind = youtube_json["kind"] + print(f" | | Response Kind: {YT_response_kind}") - YT_subsCount = response["items"][0]["statistics"]["subscriberCount"] - print("Subscribers: ", YT_subsCount) - print("Monotonic: ", time.monotonic()) + youtube_response.close() + print("✂️ Disconnected from YouTube API") print("\nFinished!") - print("Next Update in %s %s" % (int(sleep_int), sleep_time_conversion)) + print(f"Board Uptime: {time_calc(time.monotonic())}") + print(f"Next Update: {time_calc(SLEEP_TIME)}") print("===============================") - gc.collect() except (ValueError, RuntimeError) as e: - print("Failed to get data, retrying\n", e) + print(f"Failed to get data, retrying\n {e}") time.sleep(60) - continue - time.sleep(sleep_time) + break + time.sleep(SLEEP_TIME) diff --git a/examples/wifi/expanded/requests_wifi_multiple_cookies.py b/examples/wifi/expanded/requests_wifi_multiple_cookies.py index 36e4616..9237903 100644 --- a/examples/wifi/expanded/requests_wifi_multiple_cookies.py +++ b/examples/wifi/expanded/requests_wifi_multiple_cookies.py @@ -1,36 +1,37 @@ # SPDX-FileCopyrightText: 2022 Alec Delaney # SPDX-License-Identifier: MIT - -""" -This example was written for the MagTag; changes may be needed -for connecting to the internet depending on your device. -""" +# Coded for Circuit Python 9.0 +""" Multiple Cookies Example written for MagTag """ import os -import ssl -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests -COOKIE_TEST_URL = "https://www.adafruit.com" - # Get WiFi details, ensure these are setup in settings.toml ssid = os.getenv("CIRCUITPY_WIFI_SSID") password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -# Connect to the Wi-Fi network -print("Connecting to %s" % ssid) -wifi.radio.connect(ssid, password) +COOKIE_TEST_URL = "https://www.adafruit.com" -# Set up the requests library -pool = socketpool.SocketPool(wifi.radio) -requests = adafruit_requests.Session(pool, ssl.create_default_context()) +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) -# GET from the URL -print("Fetching multiple cookies from", COOKIE_TEST_URL) -response = requests.get(COOKIE_TEST_URL) +print(f"\nConnecting to {ssid}...") +try: + # Connect to the Wi-Fi network + wifi.radio.connect(ssid, password) + # URL GET Request + response = requests.get(COOKIE_TEST_URL) +except OSError as e: + print(f"❌ OSError: {e}") +print("✅ Wifi!") + +print(f" | Fetching Cookies: {COOKIE_TEST_URL}") # Spilt up the cookies by ", " elements = response.headers["set-cookie"].split(", ") @@ -49,10 +50,13 @@ cookie_list.append(element) # Pring the information about the cookies -print("Number of cookies:", len(cookie_list)) -print("") -print("Cookies received:") -print("-" * 40) +print(f" | Total Cookies: {len(cookie_list)}") +print("-" * 80) + for cookie in cookie_list: - print(cookie) - print("-" * 40) + print(f" | 🍪 {cookie}") + print("-" * 80) + +response.close() +print(f"✂️ Disconnected from {COOKIE_TEST_URL}") +print("Finished!") diff --git a/examples/wifi/requests_wifi_simpletest.py b/examples/wifi/requests_wifi_simpletest.py index 35b835a..21492fb 100644 --- a/examples/wifi/requests_wifi_simpletest.py +++ b/examples/wifi/requests_wifi_simpletest.py @@ -1,10 +1,11 @@ # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # SPDX-License-Identifier: MIT +# Updated for Circuit Python 9.0 +""" WiFi Simpletest """ import os -import ssl -import socketpool +import adafruit_connection_manager import wifi import adafruit_requests @@ -13,60 +14,57 @@ ssid = os.getenv("CIRCUITPY_WIFI_SSID") password = os.getenv("CIRCUITPY_WIFI_PASSWORD") -# Initialize WiFi Pool (There can be only 1 pool & top of script) -radio = wifi.radio -pool = socketpool.SocketPool(radio) - -print("Connecting to AP...") -while not wifi.radio.ipv4_address: - try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print("could not connect to AP, retrying: ", e) -print("Connected to", str(radio.ap_info.ssid, "utf-8"), "\tRSSI:", radio.ap_info.rssi) - -# Initialize a requests session -ssl_context = ssl.create_default_context() -requests = adafruit_requests.Session(pool, ssl_context) - TEXT_URL = "http://wifitest.adafruit.com/testwifi/index.html" JSON_GET_URL = "https://httpbin.org/get" JSON_POST_URL = "https://httpbin.org/post" -print("Fetching text from %s" % TEXT_URL) +# Initalize Wifi, Socket Pool, Request Session +pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) +ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) +requests = adafruit_requests.Session(pool, ssl_context) +rssi = wifi.radio.ap_info.rssi + +print(f"\nConnecting to {ssid}...") +print(f"Signal Strength: {rssi}") +try: + # Connect to the Wi-Fi network + wifi.radio.connect(ssid, password) +except OSError as e: + print(f"❌ OSError: {e}") +print("✅ Wifi!") + +print(f" | GET Text Test: {TEXT_URL}") response = requests.get(TEXT_URL) -print("-" * 40) - -print("Text Response: ", response.text) -print("-" * 40) +print(f" | ✅ GET Response: {response.text}") response.close() +print(f" | ✂️ Disconnected from {TEXT_URL}") +print("-" * 80) -print("Fetching JSON data from %s" % JSON_GET_URL) +print(f" | GET Full Response Test: {JSON_GET_URL}") response = requests.get(JSON_GET_URL) -print("-" * 40) - -print("JSON Response: ", response.json()) -print("-" * 40) +print(f" | ✅ Unparsed Full JSON Response: {response.json()}") response.close() +print(f" | ✂️ Disconnected from {JSON_GET_URL}") +print("-" * 80) -data = "31F" -print("POSTing data to {0}: {1}".format(JSON_POST_URL, data)) -response = requests.post(JSON_POST_URL, data=data) -print("-" * 40) - +DATA = "This is an example of a JSON value" +print(f" | ✅ JSON 'value' POST Test: {JSON_POST_URL} {DATA}") +response = requests.post(JSON_POST_URL, data=DATA) json_resp = response.json() # Parse out the 'data' key from json_resp dict. -print("Data received from server:", json_resp["data"]) -print("-" * 40) +print(f" | ✅ JSON 'value' Response: {json_resp['data']}") response.close() +print(f" | ✂️ Disconnected from {JSON_POST_URL}") +print("-" * 80) -json_data = {"Date": "July 25, 2019"} -print("POSTing data to {0}: {1}".format(JSON_POST_URL, json_data)) +json_data = {"Date": "January 1, 1970"} +print(f" | ✅ JSON 'key':'value' POST Test: {JSON_POST_URL} {json_data}") response = requests.post(JSON_POST_URL, json=json_data) -print("-" * 40) - json_resp = response.json() # Parse out the 'json' key from json_resp dict. -print("JSON Data received from server:", json_resp["json"]) -print("-" * 40) +print(f" | ✅ JSON 'key':'value' Response: {json_resp['json']}") response.close() +print(f" | ✂️ Disconnected from {JSON_POST_URL}") +print("-" * 80) + +print("Finished!")