-
Notifications
You must be signed in to change notification settings - Fork 3
/
adapt.py
93 lines (76 loc) · 3.23 KB
/
adapt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import logging
from api_request import ApiRequest
from config import config
"""
All communication is done via the two functions update() and set(). The software can be customized via these two
functions. The update function must read the values from the hardware. The set function is triggered by
the application with the three parameters (stop , phase, amp). The set function must send these to the wallbox.
In this implementation, communication does not take place directly with the wallbox because the separate application
(meterhub) already provides the entire status of the wallbox. This means that the wallbox is not unnecessarily burdened
by further queries. The control commands are also sent to the wallbox via (meterhub). This avoids simultaneous queries
and commands.
"""
class Adapt:
def __init__(self):
self.log = logging.getLogger("adapt")
self.meterhub = ApiRequest(config['meterhub_address'], timeout=0.5, lifetime=10, log_name='meterhub')
def update(self, app, post=None):
"""
Query the wallbox and the necessary electricity meters
"""
d = self.meterhub.read(post)
self.log.debug("read: {}".format(d))
app.state = self.meterhub.get('car_state')
app.amp = self.meterhub.get('car_amp')
app.wb_phase = self.meterhub.get('car_phase') # phase from wallbox 1/3
app.power = self.meterhub.get('car_p')
app.charge_energy = self.meterhub.get('car_e_cycle')
app.stop = self.meterhub.get('car_stop')
# calculation of the available excess
p = -self.meterhub.get('grid_p', 0) + self.meterhub.get('car_p', 0)
if self.meterhub.get('bat_p', 0) < 0: # Einspeisen
p = p + self.meterhub.get('bat_p', 0) # minus
app.excess_p = p
def set(self, stop=None, phase=None, amp=None):
"""
Sending commands to the wallbox
amp = 1..16 1..16 ampere
stop=bool True=force wallbox to stop, False=release wallbox (frc=0,1 0=force release, 1=force stop)
phase=1,3 desired phase operation (psm=1,2 1=1-Phase 2=3-Phase)
Wallbox:
api/set?amp=6&frc=0&psm=1
"""
cmd = []
if amp is not None and 6 <= amp <= 16:
cmd.append('amp={}'.format(amp))
if stop is True:
cmd.append('frc=1')
elif stop is False:
cmd.append('frc=0')
if phase is not None and phase == 1:
cmd.append('psm=1')
elif phase is not None and phase == 3:
cmd.append('psm=2')
if cmd:
cmd = '/command/goe?' + "&".join(cmd)
if self.meterhub.read(url_extension=cmd):
self.log.debug("set wallbox command: '{}'".format(cmd))
else:
self.log.error("set wallbox command: '{}' failed".format(cmd))
if __name__ == "__main__":
import time
class App:
def __init__(self):
self.state = None
self.amp = None
self.phase = None
self.power = None
self.charge_energy = None
self.stop = None
self.excess_p = None
app = App()
io = Adapt()
while True:
io.update(app)
print(app.__dict__)
time.sleep(2)