-
Notifications
You must be signed in to change notification settings - Fork 6
/
eco_can_tools.py
152 lines (129 loc) · 4.24 KB
/
eco_can_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import paho.mqtt.client
import traceback
import time
"""
Helpers for Buderus ECO CAN
"""
bitidx = lambda b: "{0:03x} {1:b} {2:x} {3:x}".format(b, b & 0x400 == 0x400, (b >> 5) & 0b11111, b & 0b11111)
def dec_can_id(i):
m5 = 0b11111
mon = int(i & 0x400 > 0)
d = (i >> 5) & m5
s = i & m5
return d, s, mon
def enc_can_id(d, s, mon=0):
m5 = 0b11111
i = mon << 10
i |= (d & m5) << 5
i |= s & m5
return i
def stridx(i):
d, s, mon = dec_can_id(i)
return "{0:03x} {1:x} {2:02x} {3:02x}".format(i, mon, d, s)
def stridb(i):
d, s, mon = dec_can_id(i)
return "{0:03x} {1:b} {2:05b} {3:05b}".format(i, mon, d, s)
def dec_can_msg(l):
m = l.replace("/heizung/burner/can/raw/recv/", "")
ms = m.split(";")
rtr = int(ms[0])
i = int(ms[1], base=16)
data = bytes.fromhex(ms[2])
d, s, mon = dec_can_id(i)
typ = data[0]
offs = data[1]
mem = data[2:]
return rtr, mon, d, s, typ, offs, mem
def str_msg(l, filt=None):
rtr, mon, d, s, typ, offs, mem = dec_can_msg(l)
if filt and not filt(rtr=rtr, mon=mon, d=d, s=s, typ=typ, offs=offs, mem=mem):
return None
else:
return format_msg(rtr, mon, d, s, typ, offs, mem)
def format_msg(rtr, mon, d, s, typ, offs, mem):
longf = "rtr:{0:x} mon:{1:x} dst:{2:02x} src:{3:02x} typ:{4:02x} off:{5:02x} dat:{6:s}"
shortf = "r:{0:x} m:{1:x} r:{2:02x} s:{3:02x} t:{4:02x} o:{5:02x} d:{6:s}"
return shortf.format(rtr, mon, d, s, typ, offs, " ".join(("{0:02x}".format(b) for b in mem))
)
def dec_file(fn, fo=None):
if not fo:
fo = fn+".dec"
with open(fn) as fr:
with open(fo, "w") as fw:
fw.writelines((str_msg(l)+"\n" for l in fr))
# print(splitdumpline("/heizung/burner/can/raw/recv/ 0;38;c3 06 01 ff 19 5a 01 8e"))
def stdio_can_dec(filt=None):
import sys
while True:
l = sys.stdin.readline()
if not l: break
if l.strip():
try:
f = str_msg(l, filt)
except Exception as E:
f = str(E)
if f: sys.stdout.write(f + "\n")
def sniff_can(filt=None):
client = paho.mqtt.client.Client()
def on_connect(client, userdata, flags, rc, properties=None):
print("Connected ", rc)
client.subscribe("/heizung/burner/can/raw/recv/")
def on_message(client, userdata, msg):
try:
f = str_msg(msg.payload.decode(), filt)
if f:
print(f)
except Exception as e:
traceback.print_exc()
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = lambda client, userdata, rc: print("Disconnected ", rc)
rc = client.connect("pi3.lan")
try:
client.loop_forever()
except KeyboardInterrupt as K:
client.disconnect()
except:
client.disconnect()
raise
filt_no_mon = lambda **kwargs: kwargs["mon"] == False
filt_bcast = lambda **kwargs: kwargs["d"] == 0
def filt_something_new(**k):
t = k["typ"]
return k["mon"] == False and t != 0 and t != 1 and t != 0xca and t != 0xcb and t != 0xc3 and t != 0xc0
def enc_can_msg(d, s, typ, offs, mem, mon=0, rtr=0):
i = enc_can_id(d, s, mon)
cand = [typ, offs, *mem]
dstr = " ".join(["{0:x}".format(d) for d in cand])
mqm = "{rtr:x};{i:x};{d:s}".format(rtr=rtr, i=i, d=dstr)
return mqm
def send_can_msg(d, s, typ, offs, mem, mon=0, rtr=0):
if len(mem) != 6:
ValueError("mem must be 6 long")
mqt = "/heizung/burner/can/raw/send"
mqm = enc_can_msg(d, s, typ, offs, mem, mon, rtr)
if not client.is_connected():
connect()
return client.publish(mqt, mqm, 2).wait_for_publish()
def set_HK_mode(hkid, mode):
s = 0x11
a = 1
r = 0x1
send_can_msg(r,s,0xfb, 9, [a, 1, 0, 0, 0, 0])
time.sleep(1)
send_can_msg(r, s, hkid, 0, [0x65, 0x65, 0x65, 0x65, mode, 0x65])
def request_settings(r):
"This causes the *r*eiciver to dump its settings"
s = 17
send_can_msg(r, s, 0xfb, 1, [0]*6)
def connect():
rc = client.connect("pi3.lan")
client.loop_start()
def disconnect():
client.disconnect()
client.loop_stop()
client = paho.mqtt.client.Client()
if __name__ == "__main__":
pass
# mqtt_can_dec(filt_no_mon)
# stdio_can_dec()