From f35233961cad37772c81cf77c9943e5b1d740367 Mon Sep 17 00:00:00 2001 From: DigiH Date: Wed, 19 Jun 2024 14:10:50 +0200 Subject: [PATCH] Device Tracker sync across gateways Device Tracker sync across gateways - OpenMQTTGateway & Theengs Gateway --- TheengsGateway/__init__.py | 7 +++ TheengsGateway/ble_gateway.py | 92 +++++++++++++++++++++++++---------- TheengsGateway/config.py | 2 +- TheengsGateway/discovery.py | 11 ++++- 4 files changed, 85 insertions(+), 27 deletions(-) diff --git a/TheengsGateway/__init__.py b/TheengsGateway/__init__.py index ece2e39..2846a82 100644 --- a/TheengsGateway/__init__.py +++ b/TheengsGateway/__init__.py @@ -19,6 +19,7 @@ """ import sys +import uuid from pathlib import Path from .ble_gateway import run @@ -48,6 +49,12 @@ def main() -> None: if configuration["discovery_topic"].endswith("/sensor"): configuration["discovery_topic"] = configuration["discovery_topic"][:-7] + # Get the MAC address of the gateway. + mac_address = uuid.UUID(int=uuid.getnode()).hex[-12:] + configuration["gateway_id"] = ":".join( + [mac_address[i : i + 2] for i in range(0, 12, 2)] + ).upper() + if not configuration["host"]: sys.exit("MQTT host is not specified") diff --git a/TheengsGateway/ble_gateway.py b/TheengsGateway/ble_gateway.py index 0c8b3dd..8de95b6 100644 --- a/TheengsGateway/ble_gateway.py +++ b/TheengsGateway/ble_gateway.py @@ -151,6 +151,7 @@ def on_connect( retain=True, ) self.subscribe(self.configuration["subscribe_topic"]) + self.subscribe("theengs/internal/#") else: logger.error( "Failed to connect to MQTT broker %s:%d reason code: %s", @@ -226,28 +227,51 @@ def subscribe(self, sub_topic: str) -> None: """Subscribe to MQTT topic .""" def on_message(client, userdata, msg) -> None: # noqa: ANN001,ARG001 - logger.info( - "Received `%s` from `%s` topic", - msg.payload.decode(), - msg.topic, - ) - try: - msg_json = json.loads(msg.payload.decode()) - except (json.JSONDecodeError, UnicodeDecodeError) as exception: - logger.warning( - "Invalid JSON message %s: %s", msg.payload.decode(), exception - ) - return + # Theengs internal + if "theengs/internal/" in msg.topic: + # Evaluate trackersync messages + if msg.topic == "theengs/internal/trackersync": + msg_json = json.loads(msg.payload) + logger.debug("trackersync message: %s", msg_json) + + if ( + msg_json["gatewayid"] != self.configuration["gateway_id"] + and msg_json["tracker"] in self.discovered_trackers + and self.discovered_trackers[msg_json["tracker"]].time != 0 + ): + self.discovered_trackers[msg_json["tracker"]].time = 0 + logger.debug( + "Tracker %s disassociated by gateway %s", + msg_json["tracker"], + msg_json["gatewayid"], + ) - try: - msg_json["id"] = self.rpa2id(msg_json["id"]) - except KeyError: - logger.warning( - "JSON message %s doesn't contain id", msg.payload.decode() + logger.debug( + "[DIS] Discovered Trackers: %s", self.discovered_trackers + ) + else: + logger.info( + "Received `%s` from `%s` topic", + msg.payload.decode(), + msg.topic, ) - return + try: + msg_json = json.loads(msg.payload.decode()) + except (json.JSONDecodeError, UnicodeDecodeError) as exception: + logger.warning( + "Invalid JSON message %s: %s", msg.payload.decode(), exception + ) + return + + try: + msg_json["id"] = self.rpa2id(msg_json["id"]) + except KeyError: + logger.warning( + "JSON message %s doesn't contain id", msg.payload.decode() + ) + return - self.decode_advertisement(msg_json) + self.decode_advertisement(msg_json) self.client.subscribe(sub_topic) self.client.on_message = on_message @@ -369,14 +393,11 @@ def check_tracker_timeout(self) -> None: if ( round(time()) - time_model.time >= self.configuration["tracker_timeout"] and time_model.time != 0 - and ( - self.configuration["discovery"] - or self.configuration["general_presence"] - ) ): if ( time_model.model_id in ("APPLEWATCH", "APPLEDEVICE") and not self.configuration["discovery"] + and self.configuration["general_presence"] ): message = json.dumps( {"id": address, "presence": "absent", "unlocked": False} @@ -390,9 +411,12 @@ def check_tracker_timeout(self) -> None: + "/" + address.replace(":", ""), ) + time_model.time = 0 self.discovered_trackers[address] = time_model - logger.debug("Discovered Trackers: %s", self.discovered_trackers) + + logger.info("Tracker %s timed out", address) + logger.debug("[TO] Discovered Trackers: %s", self.discovered_trackers) async def ble_scan_loop(self) -> None: """Scan for BLE devices.""" @@ -439,6 +463,10 @@ async def ble_scan_loop(self) -> None: "Sent %s messages to MQTT", self.published_messages, ) + + # Check tracker timeouts + self.check_tracker_timeout() + await asyncio.sleep( self.configuration["ble_time_between_scans"], ) @@ -610,11 +638,25 @@ def publish_json( + "/" + get_address(data_json).replace(":", ""), ) + + # Update tracker last received time self.discovered_trackers[str(data_json["id"])] = TnM( round(time()), str(data_json["model_id"]), ) - logger.debug("Discovered Trackers: %s", self.discovered_trackers) + # Publish trackersync message + message = json.dumps( + { + "gatewayid": self.configuration["gateway_id"], + "tracker": data_json["id"], + } + ) + self.publish( + message, + "theengs/internal/trackersync", + ) + + logger.debug("[GP] Discovered Trackers: %s", self.discovered_trackers) # Remove "track" if PUBLISH_ADVDATA is 0 if not self.configuration["publish_advdata"] and "track" in data_json: diff --git a/TheengsGateway/config.py b/TheengsGateway/config.py index 33d8db9..fc1f3f8 100644 --- a/TheengsGateway/config.py +++ b/TheengsGateway/config.py @@ -20,7 +20,7 @@ "port": 1883, "user": "", "pass": "", - "ble_scan_time": 5, + "ble_scan_time": 7, "ble_time_between_scans": 5, "publish_topic": "home/TheengsGateway/BTtoMQTT", "lwt_topic": "home/TheengsGateway/LWT", diff --git a/TheengsGateway/discovery.py b/TheengsGateway/discovery.py index 69ca8c4..1a7e1f4 100644 --- a/TheengsGateway/discovery.py +++ b/TheengsGateway/discovery.py @@ -272,8 +272,17 @@ def copy_pub_device(self, device: dict) -> dict: self.discovered_trackers[device["id"]] = TnM( round(time()), device["model_id"] ) - logger.debug("Discovered Trackers: %s", self.discovered_trackers) + # Publish trackersync message + message = json.dumps( + {"gatewayid": self.configuration["gateway_id"], "tracker": device["id"]} + ) + self.publish( + message, + "theengs/internal/trackersync", + ) + + logger.debug(" Discovered Trackers: %s", self.discovered_trackers) pub_device_copy = device.copy() # Remove "track" if PUBLISH_ADVDATA is 0 if not self.configuration["publish_advdata"] and "track" in pub_device_copy: