-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
84 lines (71 loc) · 2.52 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""BrandMeister Event Feed"""
from dataclasses import asdict
from json import loads as json_loads
from socketio import AsyncClient
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from .data import Callee, Caller, Call
from .trigger import Trigger
DOMAIN = "bm_feed"
async def async_setup(hass: HomeAssistant, config: dict):
if DOMAIN in hass.data:
sio = hass.data[DOMAIN]
else:
hass.data[DOMAIN] = sio = AsyncClient()
async def setup():
await sio.connect(
url="https://api.brandmeister.network",
socketio_path="lh/socket.io",
transports="websocket",
)
await sio.wait()
task = hass.loop.create_task(setup())
async def teardown():
await sio.disconnect()
await task
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, teardown())
triggers = (
[Trigger(query) for query in config[DOMAIN]["triggers"]]
if "triggers" in config[DOMAIN]
else None
)
@sio.on("mqtt")
async def _handler(data: dict):
event = None
if data["topic"] == "LH":
payload = json_loads(data["payload"])
if (
payload["Event"] == "Session-Stop"
and "Call" in payload["CallTypes"]
and (
payload["LinkCall"]
+ payload["SourceCall"]
+ payload["DestinationCall"]
)
!= ""
):
event = (
"call",
Call(
Caller(
int(payload["SourceID"]),
str(payload["SourceCall"]),
),
Callee(
int(payload["DestinationID"]),
str(payload["DestinationCall"]),
"Group" in payload["CallTypes"],
),
),
)
if event:
if triggers is None:
hass.bus.async_fire(f"{DOMAIN}/{event[0]}", event[1])
else:
triggered = [str(trigger) for trigger in triggers if trigger(event[1])]
if triggered:
hass.bus.async_fire(
f"{DOMAIN}/{event[0]}",
{**asdict(event[1]), "triggers": triggered},
)
return True