-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt_handler.py
55 lines (46 loc) · 2.32 KB
/
mqtt_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import logging
import queue
import threading
import time
from typing import Any
import paho.mqtt.client as mqtt
from paho.mqtt.client import ConnectFlags
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
from paho.mqtt.reasoncodes import ReasonCode
class MqttHandler:
def __init__(self, config: dict) -> None:
self.topic_prefix: str = config.get('mqtt_topic', 'sungrowmodbus2mqtt/').rstrip('/') + '/'
self.host: str = config['mqtt_server']
self.port: int = config.get('mqtt_port', 1883)
self.mqttc: mqtt.Client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.mqttc.on_connect = self.on_connect
self.mqttc.username_pw_set(config['mqtt_username'], config['mqtt_password'])
self.mqttc.will_set(self.topic_prefix + 'available', 'offline', retain=True)
self.mqttc.connect_async(host=self.host, port=self.port)
self.mqttc.loop_start()
self.publishing_queue: queue.Queue[dict[str, Any]] = queue.Queue()
self.publishing_thread: threading.Thread = threading.Thread(target=self.publishing_handler, daemon=True)
self.publishing_thread.start()
def on_connect(self, client: mqtt.Client, userdata: Any, connect_flags: ConnectFlags, reason_code: ReasonCode,
properties: Properties | None) -> None:
self.mqttc.publish(self.topic_prefix + 'available', 'online', retain=True)
if reason_code == ReasonCode(PacketTypes.CONNACK, 'Success'):
logging.info('mqtt connected.')
else:
logging.error(f'mqtt connection to %s:%s failed, %s.', self.host, self.port, reason_code)
def publish(self, topic: str, payload: str | int | float, retain: bool = False) -> None:
self.publishing_queue.put({
'topic': self.topic_prefix + topic,
'payload': payload,
'retain': retain,
})
def publishing_handler(self) -> None:
while True:
message: dict[str, Any] = self.publishing_queue.get()
while not self.mqttc.is_connected():
time.sleep(1)
result: mqtt.MQTTMessageInfo = self.mqttc.publish(**message)
if result.rc != mqtt.MQTT_ERR_SUCCESS:
logging.error(f'mqtt publish failed: %s %s.', message, result)
self.publishing_queue.task_done()