Skip to content

Commit

Permalink
Update Open-Sky Network Single Flight API Example with Connection Man…
Browse files Browse the repository at this point in the history
…ager
  • Loading branch information
DJDevon3 committed Mar 18, 2024
1 parent 7106d2f commit 6f63caa
Showing 1 changed file with 160 additions and 115 deletions.
275 changes: 160 additions & 115 deletions examples/wifi/expanded/requests_wifi_api_openskynetwork_private.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# SPDX-FileCopyrightText: 2023 DJDevon3
# SPDX-FileCopyrightText: 2024 DJDevon3
# SPDX-License-Identifier: MIT
# Coded for Circuit Python 8.1
# DJDevon3 ESP32-S3 OpenSkyNetwork_Private_API_Example
# Coded for Circuit Python 8.2.x
"""OpenSky-Network.org Private API Example"""
# pylint: disable=import-error

import json
import os
import ssl
import time

import circuitpython_base64 as base64
import socketpool
import adafruit_connection_manager
import wifi
from adafruit_binascii import b2a_base64

import adafruit_requests

Expand All @@ -19,135 +18,181 @@
# All active flights JSON: https://opensky-network.org/api/states/all # PICK ONE! :)
# JSON order: transponder, callsign, country
# ACTIVE transpondes only, for multiple "c822af&icao24=cb3993&icao24=c63923"
transponder = "7c6b2d"
TRANSPONDER = "471efd"

# Initialize WiFi Pool (There can be only 1 pool & top of script)
pool = socketpool.SocketPool(wifi.radio)
# Github developer token required.
username = os.getenv("GITHUB_USERNAME")
token = os.getenv("GITHUB_TOKEN")

# Time between API refreshes
# Get WiFi details, ensure these are setup in settings.toml
ssid = os.getenv("CIRCUITPY_WIFI_SSID")
password = os.getenv("CIRCUITPY_WIFI_PASSWORD")
osnusername = os.getenv("OSN_USERNAME") # Website Credentials
osnpassword = os.getenv("OSN_PASSWORD") # Website Credentials

# API Polling Rate
# 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour
# OpenSky-Networks IP bans for too many requests, check rate limit.
# https://openskynetwork.github.io/opensky-api/rest.html#limitations
sleep_time = 1800
SLEEP_TIME = 1800

# Get WiFi details, ensure these are setup in settings.toml
ssid = os.getenv("CIRCUITPY_WIFI_SSID")
password = os.getenv("CIRCUITPY_WIFI_PASSWORD")
osnu = os.getenv("OSN_Username")
osnp = os.getenv("OSN_Password")

osn_cred = str(osnu) + ":" + str(osnp)
bytes_to_encode = b" " + str(osn_cred) + " "
base64_string = base64.encodebytes(bytes_to_encode)
base64cred = repr(base64_string)[2:-1]

Debug_Auth = False # STREAMER WARNING this will show your credentials!
if Debug_Auth:
osn_cred = str(osnu) + ":" + str(osnp)
bytes_to_encode = b" " + str(osn_cred) + " "
print(repr(bytes_to_encode))
base64_string = base64.encodebytes(bytes_to_encode)
print(repr(base64_string)[2:-1])
base64cred = repr(base64_string)[2:-1]
print("Decoded Bytes:", str(base64cred))
# Set debug to True for full JSON response.
# WARNING: makes credentials visible
DEBUG = False

# Initalize Wifi, Socket Pool, Request Session
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio)
requests = adafruit_requests.Session(pool, ssl_context)

# -- Base64 Conversion --
OSN_CREDENTIALS = str(osnusername) + ":" + str(osnpassword)
OSN_CREDENTIALS_B = b"" + str(OSN_CREDENTIALS) + ""
BASE64_ASCII = b2a_base64(OSN_CREDENTIALS_B)
BASE64_STRING = str(BASE64_ASCII) # bytearray
TRUNCATED_BASE64_STRING = BASE64_STRING[2:-1] # truncate bytearray head/tail

if DEBUG:
print("Original Binary Data: ", OSN_CREDENTIALS_B)
print("Base64 ByteArray: ", BASE64_ASCII)
print(f"Base64 String: {TRUNCATED_BASE64_STRING}")

# Requests URL - icao24 is their endpoint required for a transponder
# example https://opensky-network.org/api/states/all?icao24=a808c5
# OSN private requires your username:password to be base64 encoded
osn_header = {"Authorization": "Basic " + str(base64cred)}
OPENSKY_SOURCE = "https://opensky-network.org/api/states/all?" + "icao24=" + transponder
# OSN private: requires your website username:password to be base64 encoded
OSN_HEADER = {"Authorization": "Basic " + str(TRUNCATED_BASE64_STRING)}
OPENSKY_SOURCE = "https://opensky-network.org/api/states/all?" + "icao24=" + TRANSPONDER


# Converts seconds to human readable minutes/hours/days
def time_calc(input_time): # input_time in seconds
def time_calc(input_time):
"""Converts seconds to minutes/hours/days"""
if input_time < 60:
sleep_int = input_time
time_output = f"{sleep_int:.0f} seconds"
elif 60 <= input_time < 3600:
sleep_int = input_time / 60
time_output = f"{sleep_int:.0f} minutes"
elif 3600 <= input_time < 86400:
sleep_int = input_time / 60 / 60
time_output = f"{sleep_int:.1f} hours"
else:
sleep_int = input_time / 60 / 60 / 24
time_output = f"{sleep_int:.1f} days"
return time_output
return f"{input_time:.0f} seconds"
if input_time < 3600:
return f"{input_time / 60:.0f} minutes"
if input_time < 86400:
return f"{input_time / 60 / 60:.0f} hours"
return f"{input_time / 60 / 60 / 24:.1f} days"


def _format_datetime(datetime):
return "{:02}/{:02}/{} {:02}:{:02}:{:02}".format(
datetime.tm_mon,
datetime.tm_mday,
datetime.tm_year,
datetime.tm_hour,
datetime.tm_min,
datetime.tm_sec,
return (
f"{datetime.tm_mon:02}/"
+ f"{datetime.tm_mday:02}/"
+ f"{datetime.tm_year:02} "
+ f"{datetime.tm_hour:02}:"
+ f"{datetime.tm_min:02}:"
+ f"{datetime.tm_sec:02}"
)


# Connect to Wi-Fi
print("\n===============================")
print("Connecting to WiFi...")
request = adafruit_requests.Session(pool, ssl.create_default_context())
while not wifi.radio.ipv4_address:
while True:
# Connect to Wi-Fi
print("\nConnecting to WiFi...")
while not wifi.radio.ipv4_address:
try:
wifi.radio.connect(ssid, password)
except ConnectionError as e:
print("❌ Connection Error:", e)
print("Retrying in 10 seconds")
print("✅ Wifi!")

try:
wifi.radio.connect(ssid, password)
except ConnectionError as e:
print("Connection Error:", e)
print("Retrying in 10 seconds")
time.sleep(10)
print("Connected!\n")
print(" | Attempting to GET OpenSky-Network Single Flight JSON!")
try:
opensky_response = requests.get(url=OPENSKY_SOURCE, headers=OSN_HEADER)
opensky_json = opensky_response.json()
except ConnectionError as e:
print("Connection Error:", e)
print("Retrying in 10 seconds")

while True:
# STREAMER WARNING this will show your credentials!
debug_request = False # Set True to see full request
if debug_request:
print("Full API HEADER: ", str(osn_header))
print("Full API GET URL: ", OPENSKY_SOURCE)
print("===============================")
print(" | ✅ OpenSky-Network JSON!")

print("\nAttempting to GET OpenSky-Network Data!")
opensky_response = request.get(url=OPENSKY_SOURCE, headers=osn_header).json()
if DEBUG:
print("Full API GET URL: ", OPENSKY_SOURCE)
print(opensky_json)

# Print Full JSON to Serial (doesn't show credentials)
debug_response = False # Set True to see full response
if debug_response:
dump_object = json.dumps(opensky_response)
print("JSON Dump: ", dump_object)
# ERROR MESSAGE RESPONSES
if "timestamp" in opensky_json:
osn_timestamp = opensky_json["timestamp"]
print(f"❌ Timestamp: {osn_timestamp}")

# Key:Value Serial Debug (doesn't show credentials)
osn_debug_keys = True # Set True to print Serial data
if osn_debug_keys:
try:
osn_flight = opensky_response["time"]
print("Current Unix Time: ", osn_flight)

current_struct_time = time.localtime(osn_flight)
current_date = "{}".format(_format_datetime(current_struct_time))
print(f"Unix to Readable Time: {current_date}")

# Current flight data for single callsign (right now)
osn_single_flight_data = opensky_response["states"]

if osn_single_flight_data is not None:
print("Flight Data: ", osn_single_flight_data)
transponder = opensky_response["states"][0][0]
print("Transponder: ", transponder)
callsign = opensky_response["states"][0][1]
print("Callsign: ", callsign)
country = opensky_response["states"][0][2]
print("Flight Country: ", country)
if "message" in opensky_json:
osn_message = opensky_json["message"]
print(f"❌ Message: {osn_message}")

if "error" in opensky_json:
osn_error = opensky_json["error"]
print(f"❌ Error: {osn_error}")

if "path" in opensky_json:
osn_path = opensky_json["path"]
print(f"❌ Path: {osn_path}")

if "status" in opensky_json:
osn_status = opensky_json["status"]
print(f"❌ Status: {osn_status}")

# Current flight data for single callsign (right now)
osn_single_flight_data = opensky_json["states"]

if osn_single_flight_data is not None:
if DEBUG:
print(f" | | Single Flight Data: {osn_single_flight_data}")

last_contact = opensky_json["states"][0][4]
# print(f" | | Last Contact Unix Time: {last_contact}")
lc_struct_time = time.localtime(last_contact)
lc_readable_time = f"{_format_datetime(lc_struct_time)}"
print(f" | | Last Contact: {lc_readable_time}")

flight_transponder = opensky_json["states"][0][0]
print(f" | | Transponder: {flight_transponder}")

callsign = opensky_json["states"][0][1]
print(f" | | Callsign: {callsign}")

squawk = opensky_json["states"][0][14]
print(f" | | Squawk: {squawk}")

country = opensky_json["states"][0][2]
print(f" | | Origin: {country}")

longitude = opensky_json["states"][0][5]
print(f" | | Longitude: {longitude}")

latitude = opensky_json["states"][0][6]
print(f" | | Latitude: {latitude}")

# Return Air Flight data if not on ground
on_ground = opensky_json["states"][0][8]
if on_ground is True:
print(f" | | On Ground: {on_ground}")
else:
print("Flight has no active data or you're polling too fast.")

print("\nFinished!")
print("Board Uptime: ", time_calc(time.monotonic()))
print("Next Update: ", time_calc(sleep_time))
time.sleep(sleep_time)
print("===============================")

except (ConnectionError, ValueError, NameError) as e:
print("OSN Connection Error:", e)
print("Next Retry: ", time_calc(sleep_time))
time.sleep(sleep_time)
altitude = opensky_json["states"][0][7]
print(f" | | Barometric Altitude: {altitude}")

velocity = opensky_json["states"][0][9]
if velocity != "null":
print(f" | | Velocity: {velocity}")

vertical_rate = opensky_json["states"][0][11]
if vertical_rate != "null":
print(f" | | Vertical Rate: {vertical_rate}")

else:
print(" | | ❌ Flight has no active data or you're polling too fast.")

opensky_response.close()
print("✂️ Disconnected from OpenSky-Network API")

print("\nFinished!")
print(f"Board Uptime: {time_calc(time.monotonic())}")
print(f"Next Update: {time_calc(SLEEP_TIME)}")
print("===============================")

except (ValueError, RuntimeError) as e:
print(f"Failed to get data, retrying\n {e}")
time.sleep(60)
break
time.sleep(SLEEP_TIME)

0 comments on commit 6f63caa

Please sign in to comment.