This script interfaces with a USB device to read NMEA data, process it, and publish it to an MQTT broker. It also periodically clears the data log file to avoid large file sizes.
- Function to Empty NMEA Data File
- Function to Send Command to USB Device
- Function to Send Keep-Alive Signal
- Function to Convert USB Data to NMEA Format
- MQTT Callbacks
- Main Program Execution
This function clears the contents of the specified file every 30 seconds.
def empty_file_every_30_seconds(file_path):
while True:
time.sleep(30)
with open(file_path, "w+") as file:
file.write("")
print("File has been emptied")
file_path
(str): The path to the file that needs to be cleared.
threading.Thread(target=empty_file_every_30_seconds, args=("nmea_output.txt",), daemon=True).start()
This function sends a command to the specified USB device.
def send_command(dev, interface, endpoint_address, command):
command += "\r"
try:
dev.write(endpoint_address, command.encode("utf-8"))
print(f'Interface {interface}: Command "{command.strip()}" sent')
except usb.core.USBError as e:
print(f'Interface {interface}: Error sending command "{command.strip()}": {e}')
dev
(usb.core.Device): The USB device object.interface
(int): The interface number of the USB device.endpoint_address
(int): The endpoint address for sending data.command
(str): The command string to send to the device.
send_command(dev, interface, endpoint_out, "RAW 1")
This function sends a keep-alive signal to the USB device to maintain the connection.
def send_keep_alive(dev, endpoint_address):
while True:
try:
dev.write(endpoint_address, b"\x4B\x41\x0A")
print("Keep alive command sent")
except usb.core.USBError as e:
print(f"Error sending keep alive command: {e}")
time.sleep(1)
dev
(usb.core.Device): The USB device object.endpoint_address
(int): The endpoint address for sending data.
threading.Thread(target=send_keep_alive, args=(dev, endpoint_out), daemon=True).start()
This function converts raw USB data to NMEA formatted text.
def convert_to_nmea(data):
try:
ascii_data = "".join([chr(x) for x in data if x != 0])
return ascii_data
except Exception as e:
print(f"Error converting data to NMEA: {e}")
return None
data
(list of int): Raw data read from the USB device.
nmea_output = convert_to_nmea(data)
These are callback functions for handling MQTT events.
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code {rc}\n")
def on_publish(client, userdata, mid):
print(f"Message {mid} published.")
client.on_connect = on_connect
client.on_publish = on_publish
The main program includes USB device setup, MQTT client setup, and the main loop for reading, processing, and publishing data.
VENDOR_ID = 0x0403
PRODUCT_ID = 0xF241
dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
if dev is None:
print("Device not found")
sys.exit(1)
interface = 0
endpoint_out = 0x02
endpoint_in = 0x81
if dev.is_kernel_driver_active(interface):
try:
dev.detach_kernel_driver(interface)
print(f"Kernel driver detached for interface {interface}")
except usb.core.USBError as e:
print(f"Could not detach kernel driver for interface {interface}: {e}")
sys.exit(1)
try:
dev.set_configuration()
print("Device configuration set")
except usb.core.USBError as e:
print(f"Could not set configuration: {e}")
sys.exit(1)
try:
usb.util.claim_interface(dev, interface)
print(f"Interface {interface} claimed")
except usb.core.USBError as e:
print(f"Could not claim interface {interface}: {e}")
sys.exit(1)
send_command(dev, interface, endpoint_out, "RAW 1")
threading.Thread(target=send_keep_alive, args=(dev, endpoint_out), daemon=True).start()
broker = "broker.mqtt.cool"
port = 1883
topic = "NMEA_Lightning"
client_id = f"python-mqtt-{int(time.time())}"
client = mqtt_client.Client(client_id=client_id, protocol=mqtt_client.MQTTv311, transport="tcp")
client.on_connect = on_connect
client.on_publish = on_publish
try:
client.connect(broker, port, 60)
client.loop_start()
except Exception as e:
print(f"Could not connect to MQTT broker: {e}")
sys.exit(1)
file_path = "nmea_output.txt"
output_buffer = ""
try:
while True:
try:
data = dev.read(endpoint_in, 64, timeout=5000)
print(f"Data read from interface {interface}:")
print(data)
nmea_output = convert_to_nmea(data)
if nmea_output:
output_buffer += nmea_output
if output_buffer.count("$") >= 10:
with open(file_path, "a") as file:
file.write(output_buffer)
output_buffer = ""
start = 0
while True:
start_idx = output_buffer.find("$", start)
end_idx = output_buffer.find("$", start_idx + 1)
if start_idx != -1 and end_idx != -1:
message = output_buffer[start_idx:end_idx]
client.publish(topic, message)
start = end_idx
else:
break
except usb.core.USBError as e:
print(f"Interface {interface}: Error reading data: {e}")
time.sleep(0.2)
except KeyboardInterrupt:
print("Interrupted by user")
finally:
usb.util.release_interface(dev, interface)
print(f"Interface {interface} released")
try:
dev.attach_kernel_driver(interface)
print(f"Kernel driver reattached for interface {interface}")
except usb.core.USBError as e:
print(f"Interface {interface}: Could not reattach kernel driver: {e}")
client.loop_stop()
client.disconnect()
- USB Device Identification and Setup: This part identifies the USB device and sets up the necessary configuration and interface claims.
- MQTT Client Setup: This part sets up the MQTT client, defines the broker and topic, and connects to the broker.
- Main Loop: Continuously reads data from the USB device, processes it, logs it to a file, and publishes it to the MQTT topic. If the script is interrupted, it releases the USB interface and disconnects from the MQTT broker.