Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Open-Sky Network Private Area API Example #168

Merged
merged 6 commits into from
Apr 10, 2024
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 136 additions & 120 deletions examples/wifi/expanded/requests_wifi_api_openskynetwork_private_area.py
Original file line number Diff line number Diff line change
@@ -1,155 +1,167 @@
# SPDX-FileCopyrightText: 2023 DJDevon3
# SPDX-FileCopyrightText: 2024 DJDevon3
# SPDX-License-Identifier: MIT
# Coded for Circuit Python 8.1
# DJDevon3 ESP32-S3 OpenSkyNetwork_Private_Area_API_Example
# Coded for Circuit Python 8.2.x
"""OpenSky-Network.org Private Area API Example"""
# pylint: disable=import-error

import json
import binascii
import os
import ssl
import time

import circuitpython_base64 as base64
import socketpool
import adafruit_connection_manager
import wifi

import adafruit_requests

# OpenSky-Network.org Website Login required for this API
# Increased call limit vs Public.
# REST API: https://openskynetwork.github.io/opensky-api/rest.html

# Retrieves all traffic within a geographic area (Orlando example)
latmin = "27.22" # east bounding box
latmax = "28.8" # west bounding box
lonmin = "-81.46" # north bounding box
lonmax = "-80.40" # south bounding box
LATMIN = "27.22" # east bounding box
LATMAX = "28.8" # west bounding box
LONMIN = "-81.46" # north bounding box
LONMAX = "-80.40" # south bounding box

# Initialize WiFi Pool (There can be only 1 pool & top of script)
pool = socketpool.SocketPool(wifi.radio)
# Get WiFi details, ensure these are setup in settings.toml
ssid = os.getenv("CIRCUITPY_WIFI_SSID")
password = os.getenv("CIRCUITPY_WIFI_PASSWORD")
osnusername = os.getenv("OSN_USERNAME") # Website Credentials
osnpassword = os.getenv("OSN_PASSWORD") # Website Credentials

# Time between API refreshes
# API Polling Rate
# 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour
# OpenSky-Networks IP bans for too many requests, check rate limit.
# https://openskynetwork.github.io/opensky-api/rest.html#limitations
sleep_time = 1800

# Get WiFi details, ensure these are setup in settings.toml
ssid = os.getenv("CIRCUITPY_WIFI_SSID")
password = os.getenv("CIRCUITPY_WIFI_PASSWORD")
# No token required, only website login
osnu = os.getenv("OSN_Username")
osnp = os.getenv("OSN_Password")

osn_cred = str(osnu) + ":" + str(osnp)
bytes_to_encode = b" " + str(osn_cred) + " "
base64_string = base64.encodebytes(bytes_to_encode)
base64cred = repr(base64_string)[2:-1]

Debug_Auth = False # STREAMER WARNING this will show your credentials!
if Debug_Auth:
osn_cred = str(osnu) + ":" + str(osnp)
bytes_to_encode = b" " + str(osn_cred) + " "
print(repr(bytes_to_encode))
base64_string = base64.encodebytes(bytes_to_encode)
print(repr(base64_string)[2:-1])
base64cred = repr(base64_string)[2:-1]
print("Decoded Bytes:", str(base64cred))

# OSN requires your username:password to be base64 encoded
# so technically it's not transmitted in the clear but w/e
osn_header = {"Authorization": "Basic " + str(base64cred)}

# Example request of all traffic over Florida, geographic areas cost less per call.
SLEEP_TIME = 1800

# Set debug to True for full JSON response.
# WARNING: makes credentials visible. based on how many flights
# in your area, full response could crash microcontroller
DEBUG = False

# Initalize Wifi, Socket Pool, Request Session
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio)
requests = adafruit_requests.Session(pool, ssl_context)

# -- Base64 Conversion --
OSN_CREDENTIALS = str(osnusername) + ":" + str(osnpassword)
# base64 encode and strip appended \n from bytearray
OSN_CREDENTIALS_B = binascii.b2a_base64(b"" + str(OSN_CREDENTIALS)).strip()
BASE64_STRING = str(OSN_CREDENTIALS_B) # bytearray
SLICED_BASE64_STRING = BASE64_STRING[2:-1] # slice bytearray head/tail

if DEBUG:
print("Base64 ByteArray: ", BASE64_STRING)
print(f"Base64 Sliced String: {SLICED_BASE64_STRING}")

# Area requires OpenSky-Network.org username:password to be base64 encoded
OSN_HEADER = {"Authorization": "Basic " + str(SLICED_BASE64_STRING)}

# Example request of all traffic over Florida.
# Geographic areas calls cost less against the limit.
# https://opensky-network.org/api/states/all?lamin=25.21&lomin=-84.36&lamax=30.0&lomax=-78.40
OPENSKY_SOURCE = (
"https://opensky-network.org/api/states/all?"
+ "lamin="
+ latmin
+ LATMIN
+ "&lomin="
+ lonmin
+ LONMIN
+ "&lamax="
+ latmax
+ LATMAX
+ "&lomax="
+ lonmax
+ LONMAX
)


# Converts seconds to human readable minutes/hours/days
def time_calc(input_time): # input_time in seconds
def time_calc(input_time):
"""Converts seconds to minutes/hours/days"""
if input_time < 60:
sleep_int = input_time
time_output = f"{sleep_int:.0f} seconds"
elif 60 <= input_time < 3600:
sleep_int = input_time / 60
time_output = f"{sleep_int:.0f} minutes"
elif 3600 <= input_time < 86400:
sleep_int = input_time / 60 / 60
time_output = f"{sleep_int:.1f} hours"
else:
sleep_int = input_time / 60 / 60 / 24
time_output = f"{sleep_int:.1f} days"
return time_output
return f"{input_time:.0f} seconds"
if input_time < 3600:
return f"{input_time / 60:.0f} minutes"
if input_time < 86400:
return f"{input_time / 60 / 60:.0f} hours"
return f"{input_time / 60 / 60 / 24:.1f} days"


def _format_datetime(datetime):
return "{:02}/{:02}/{} {:02}:{:02}:{:02}".format(
datetime.tm_mon,
datetime.tm_mday,
datetime.tm_year,
datetime.tm_hour,
datetime.tm_min,
datetime.tm_sec,
"""F-String formatted struct time conversion"""
return (
f"{datetime.tm_mon:02}/"
+ f"{datetime.tm_mday:02}/"
+ f"{datetime.tm_year:02} "
+ f"{datetime.tm_hour:02}:"
+ f"{datetime.tm_min:02}:"
+ f"{datetime.tm_sec:02}"
)


# Connect to Wi-Fi
print("\n===============================")
print("Connecting to WiFi...")
request = adafruit_requests.Session(pool, ssl.create_default_context())
while not wifi.radio.ipv4_address:
while True:
# Connect to Wi-Fi
print("\nConnecting to WiFi...")
while not wifi.radio.ipv4_address:
try:
wifi.radio.connect(ssid, password)
except ConnectionError as e:
print("❌ Connection Error:", e)
print("Retrying in 10 seconds")
print("✅ Wifi!")

try:
wifi.radio.connect(ssid, password)
except ConnectionError as e:
print("Connection Error:", e)
print("Retrying in 10 seconds")
time.sleep(10)
print("Connected!\n")
print(" | Attempting to GET OpenSky-Network Area Flights JSON!")
try:
opensky_response = requests.get(url=OPENSKY_SOURCE, headers=OSN_HEADER)
opensky_json = opensky_response.json()
except ConnectionError as e:
print("Connection Error:", e)
print("Retrying in 10 seconds")

while True:
# STREAMER WARNING this will show your credentials!
debug_request = False # Set True to see full request
if debug_request:
print("Full API HEADER: ", str(osn_header))
print("Full API GET URL: ", OPENSKY_SOURCE)
print("===============================")
print(" | ✅ OpenSky-Network JSON!")

print("\nAttempting to GET OpenSky-Network Data!")
opensky_response = request.get(url=OPENSKY_SOURCE, headers=osn_header).json()
if DEBUG:
print("Full API GET URL: ", OPENSKY_SOURCE)
print(opensky_json)

# Print Full JSON to Serial (doesn't show credentials)
debug_response = False # Set True to see full response
if debug_response:
dump_object = json.dumps(opensky_response)
print("JSON Dump: ", dump_object)
# ERROR MESSAGE RESPONSES
if "timestamp" in opensky_json:
osn_timestamp = opensky_json["timestamp"]
print(f"❌ Timestamp: {osn_timestamp}")

# Key:Value Serial Debug (doesn't show credentials)
osn_debug_keys = True # Set True to print Serial data
if osn_debug_keys:
try:
osn_flight = opensky_response["time"]
print("Current Unix Time: ", osn_flight)
if "message" in opensky_json:
osn_message = opensky_json["message"]
print(f"❌ Message: {osn_message}")

if "error" in opensky_json:
osn_error = opensky_json["error"]
print(f"❌ Error: {osn_error}")

if "path" in opensky_json:
osn_path = opensky_json["path"]
print(f"❌ Path: {osn_path}")

if "status" in opensky_json:
osn_status = opensky_json["status"]
print(f"❌ Status: {osn_status}")

current_struct_time = time.localtime(osn_flight)
current_date = "{}".format(_format_datetime(current_struct_time))
print(f"Unix to Readable Time: {current_date}")
# Current flight data for single callsign (right now)
osn_all_flights = opensky_json["states"]

# Current flight data for single callsign (right now)
osn_all_flights = opensky_response["states"]
if osn_all_flights is not None:
if DEBUG:
print(f" | | Area Flights Full Response: {osn_all_flights}")

osn_time = opensky_json["time"]
# print(f" | | Last Contact Unix Time: {osn_time}")
osn_struct_time = time.localtime(osn_time)
osn_readable_time = f"{_format_datetime(osn_struct_time)}"
print(f" | | Timestamp: {osn_readable_time}")

if osn_all_flights is not None:
# print("Flight Data: ", osn_all_flights)
for flights in osn_all_flights:
osn_t = f"Trans:{flights[0]} "
osn_c = f"Sign:{flights[1]} "
osn_t = f" | | Trans:{flights[0]} "
osn_c = f"Sign:{flights[1]}"
osn_o = f"Origin:{flights[2]} "
osn_tm = f"Time:{flights[3]} "
osn_l = f"Last:{flights[4]} "
Expand All @@ -171,16 +183,20 @@ def _format_datetime(datetime):
string2 = f"{osn_la}{osn_ba}{osn_g}{osn_v}{osn_h}{osn_vr}"
string3 = f"{osn_s}{osn_ga}{osn_sq}{osn_pr}{osn_ps}{osn_ca}"
print(f"{string1}{string2}{string3}")
else:
print("Flight has no active data or you're polling too fast.")

print("\nFinished!")
print("Board Uptime: ", time_calc(time.monotonic()))
print("Next Update: ", time_calc(sleep_time))
time.sleep(sleep_time)
print("===============================")

except (ConnectionError, ValueError, NameError) as e:
print("OSN Connection Error:", e)
print("Next Retry: ", time_calc(sleep_time))
time.sleep(sleep_time)

else:
print(" | | ❌ Area has no active data or you're polling too fast.")

opensky_response.close()
print("✂️ Disconnected from OpenSky-Network API")

print("\nFinished!")
print(f"Board Uptime: {time_calc(time.monotonic())}")
print(f"Next Update: {time_calc(SLEEP_TIME)}")
print("===============================")

except (ValueError, RuntimeError) as e:
print(f"Failed to get data, retrying\n {e}")
time.sleep(60)
break
time.sleep(SLEEP_TIME)
Loading