Skip to content

Commit

Permalink
Add mqtt tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
felipengeletrica committed May 8, 2024
1 parent 6dbc421 commit cb4cfe1
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 5 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Like ESP WIFI modules, microcontrollers, Linux serial console etc.

With this program it's possible to save logs from different serial ports just by [configuring the JSON file](#-configuring) (`config.json`), the number of ports and unlimited and each process for processing and saving the logs is with you on separate instances and threads.


## 🚀 Run the application

create a virtual environment to isolate our package dependencies locally (first run time)
Expand Down Expand Up @@ -47,6 +48,16 @@ To find the tty port associated with your microcontroller using the `ls /dev/ser
4. Fill in the serialid parameter of the devices in the [`config.json`](config.json) file with the following path: `/dev/serial/by-path/<id-returned-by-previous-command>`

> By using this path to access the serial device instead of the port number (e.g., `/dev/ttyACM0`), the operating system will always point to the correct device, regardless of how many times the microcontroller is disconnected and reconnected.


| Interface | Descrição | Uso no Código |
|-------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Serial | Interface de comunicação serial usada para comunicação ponto a ponto entre dispositivos. | Verifica se a chave "serial" está presente na interface do dispositivo e, se sim, executa o processo associado à interface serial. |
| Serial-to-MQTT | Interface que converte dados recebidos pela porta serial em mensagens MQTT para comunicação em rede. | Verifica se a chave "serial-to-mqtt" está presente na interface do dispositivo e, se sim, executa o processo associado à interface serial para MQTT. |
| Bluetooth-GPS | Interface Bluetooth usada para conectar dispositivos GPS ao sistema. | Verifica se a chave "bluetooth-gps" está presente na interface do dispositivo e, se sim, executa o processo associado à interface Bluetooth-GPS. |
| Bluetooth-BLE | Interface Bluetooth Low Energy (BLE) usada para conectar dispositivos BLE ao sistema. | Verifica se a chave "bluetooth-BLE" está presente na interface do dispositivo e, se sim, executa o processo associado à interface Bluetooth-BLE. |
| Outros | Outras interfaces não especificadas, resultando em uma exceção para dispositivos inválidos. | Se nenhuma das interfaces especificadas for encontrada no dispositivo, uma exceção é lançada indicando que o dispositivo é inválido. |

Using file `config.json` for configuration for one or multiple serial ports:

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ bluepy
pygobject
dbus-python
bleak
paho-mqtt
paho-mqtt==1.6.1

131 changes: 131 additions & 0 deletions src/interface/mqtt_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python3
# coding=utf-8
"""
MQTT Manager
"""

# region import
import threading
import time
import paho.mqtt.client as mqtt
import json

# endregion


class MqttManager(threading.Thread):
def __init__(
self,
username: str,
password: str,
server: str,
port: int,
client: str,
subscribe: str,
message_callback=None,
):
"""
Instance MQTT connections
"""
threading.Thread.__init__(self)
self.server = server
self.port = port
self.client = mqtt.Client(client)
self.__subscribe = subscribe
self.client.username_pw_set(username=username, password=password)
self.thread = threading.Thread(target=self._start)
self.status = False
self.retry = 3

def _start(self):
"""
Start process mqtt manager
:rtype: object
"""
print(f"Server: {self.server} Port: {self.port}")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message

while True:
try:
# Connect MQTT BROCKER
mqttt_status = self.client.connect(host=self.server, port=self.port)
if mqttt_status == 0:
self.status = True
else:
self.status = False
print("Running server")
# Block loop
self.client.loop_forever()

except Exception as error:
print(f"Fail Server: {self.server} Error:{error}")
self.status = False
# Add sleep in case down network
time.sleep(0.3)
pass

def publish(self, topic, payload):
"""
Publish message in broker
:param topic: Topic
:param payload: Payload
:return:
"""

(rc, mid) = (-1, -1)

try:
retry = self.retry
while retry > 0:
(rc, mid) = self.client.publish(topic=topic, payload=payload, qos=1)
print(f"Status publish: {(rc, mid)}")
if rc == 0:
return rc
time.sleep(0.3)
except Exception as error:
print(f"Fail publish error {error}")
print("fail publish")
return rc

def on_connect(self, client, userdata, flags, rc):
"""
The callback for when the client receives a CONNACK response from the server.
:param client: Client
:param userdata: Userdata
:param flags: flags
:param rc: rc
"""
try:
print(f"Connected with result code {str(rc)}")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(self.__subscribe)
except Exception as error:
print(f"Fail on connect error {error}")

def on_message(self, client, userdata, msg):
"""
The callback for when a PUBLISH message is received from the server.
:param client: Client
:param userdata: User data
:param msg: msg
"""
content = json.loads(msg.payload)
if self.on_message_callback:
self.on_message_callback(content) # Chama a função de callback com o conteúdo da mensagem

def stop(self):
"""
Stop process
"""
print("Stop mqtt client")
self.client.loop_stop()
# self.thread.join()

def run(self):
"""
Run process
"""
self.thread.daemon = True
self.thread.start()
54 changes: 50 additions & 4 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
#!/usr/bin/env python3
# coding=utf-8
"""
Main program for datalogger interface
"""

# region import
import datetime
import time
import json
import os
import sys

from interface.SerialLogger import logger
from BluetoothGpsAgrinavi import BluetoothGpsAgrinavi
from BLEConnector import BLEConnector

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
script_path = os.path.dirname(os.path.abspath(__file__))
parent_path = os.path.abspath(os.path.join(script_path, os.pardir))

from src.interface.SerialLogger import logger
from src.interface.BluetoothGpsAgrinavi import BluetoothGpsAgrinavi
from src.interface.BLEConnector import BLEConnector
from src.interface.mqtt_manager import MqttManager
# endregion




def handle_message(message_content):
print("Received message:", message_content)


def LoadJSON():

try:
Expand All @@ -32,6 +48,25 @@ def init_data_instances(datajson):
Load file of configuration and start multiples instances serial datalooger
"""


server_mqtt = datajson["server_mqtt"]

print(server_mqtt)

if server_mqtt is not None:
# Criar uma instância de MqttManager com a função de callback
mqtt_manager = MqttManager(
username=server_mqtt['username'],
password=server_mqtt['password'],
server=server_mqtt['server'],
port=server_mqtt['port'],
client='client1',
subscribe=server_mqtt['subscribe'],
message_callback=handle_message)

mqtt_manager.start() # Iniciar o cliente MQTT


try:

# After JSON LOAD
Expand All @@ -58,6 +93,17 @@ def init_data_instances(datajson):
baudrate=devices[index]['baudrate'],
timeout=devices[index]['timeout'])

elif "serial-to-mqtt" in devices[index]['interface']:
devs.append(
logger(
description=devices[index]['description'])
)
# Execute process
devs[index].run(
port=devices[index]['serialport'],
baudrate=devices[index]['baudrate'],
timeout=devices[index]['timeout'])

# Bluetooth interface
elif "bluetooth-gps" in devices[index]['interface']:
devs.append(
Expand Down

0 comments on commit cb4cfe1

Please sign in to comment.