-
Notifications
You must be signed in to change notification settings - Fork 1
/
buttonbmc.py
213 lines (177 loc) · 8.38 KB
/
buttonbmc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#!/usr/bin/env python
import logging
import argparse
import sys
import asyncio
import asyncbmc
BUTTON_CONFIG = {
"initial_power_status_value": False,
"initial_power_button_value": False,
"initial_reset_button_value": False,
"power_off_press_duration": 10,
"power_on_press_duration": 1,
"power_cycle_off_press_duration": 5,
"power_cycle_wait_duration": 2,
"power_cycle_on_press_duration": 1,
"power_reset_press_duration": 1,
"power_shutdown_press_duration": 1,
"power_shutdown_wait_duration": 20,
}
class Button(asyncbmc.AsyncStatus):
async def toggle(self, toggle_duration:int ):
assert toggle_duration >= 0
value = await self.get_value()
await self.set_value(not value)
await asyncio.sleep(toggle_duration, loop=self.loop)
return await self.set_value(value)
async def press(self, press_duration:int):
assert press_duration >= 0
await self.set_value(True)
await asyncio.sleep(press_duration, loop=self.loop)
return await self.set_value(False)
class ButtonBmc(asyncbmc.AsyncBmc):
def __init__(self, authdata, button_config: dict, name=None, port=623, loop=None):
asyncbmc.AsyncBmc.__init__(self, authdata, name=name, port=port, loop=loop)
self.power_button: Button = None
self.reset_button: Button = None
# Button
self.button_config = BUTTON_CONFIG
if button_config is not None:
self.button_config.update(button_config)
self.initial_power_status_value = self.button_config['initial'
'_power_status_value']
self.initial_power_button_value = self.button_config['initial'
'_power_button_value']
self.initial_reset_button_value = self.button_config['initial'
'_reset_button_value']
self.power_off_press_duration = self.button_config['power_off_press_duration']
self.power_on_press_duration = self.button_config['power_on_press_duration']
self.power_cycle_off_press_duration = self.button_config['power_cycle'
'_off_press_duration']
self.power_cycle_wait_duration = self.button_config['power_cycle_wait_duration']
self.power_cycle_on_press_duration = self.button_config['power_cycle'
'_on_press_duration']
self.power_reset_press_duration = self.button_config['power_reset_press_duration']
self.power_shutdown_press_duration = self.button_config['power_shutdown_press_duration']
self.power_shutdown_wait_duration = self.button_config['power_shutdown_wait_duration']
async def setup_power_button(self):
raise NotImplementedError
async def setup_reset_button(self):
raise NotImplementedError
async def setup(self):
# raise NotImplementedError
await self.setup_power_button()
await self.setup_reset_button()
return await super().setup()
# directive 0
async def press_power_off(self, press_duration):
powerstate = await self.async_get_power_state()
if (powerstate != 0 and
self.power_button is not None):
await self.power_button.press(press_duration)
powerstate = await self.async_get_power_state()
else:
logging.info('already powered off')
return powerstate
async def async_power_off(self):
powerstate = await self.press_power_off(self.power_off_press_duration)
assert powerstate == 0 # off
def power_off(self):
logging.info('abruptly remove power, '
'using power off press duration: {}'
.format(self.power_off_press_duration))
# this should power down without waiting for clean shutdown
return asyncbmc.wait_for_sync(self.async_power_off(), loop=self.loop)
# directive 1
async def press_power_on(self, press_duration):
powerstate = await self.async_get_power_state()
if (powerstate == 0 and
self.power_button is not None):
await self.power_button.press(press_duration)
powerstate = await self.async_get_power_state()
else:
logging.info('already powered on')
return powerstate
async def async_power_on(self):
powerstate = await self.press_power_on(self.power_on_press_duration)
assert powerstate == 1 # on
def power_on(self):
logging.info('power on, using power on press duration: {}'
.format(self.power_on_press_duration))
return asyncbmc.wait_for_sync(self.async_power_on(), loop=self.loop)
# directive 2
async def press_power_cycle(self, press_off_duration,
wait_duration, press_on_duration):
powerstate = await self.async_get_power_state()
if powerstate == 1:
await self.press_power_off(press_off_duration)
await asyncio.sleep(wait_duration, loop=self.loop)
# assert_power_state(0)
powerstate = await self.press_power_on(press_on_duration)
return powerstate
async def async_power_cycle(self):
powerstate = await self.press_power_cycle(self.power_cycle_off_press_duration,
self.power_cycle_wait_duration,
self.power_cycle_on_press_duration)
assert powerstate == 1 # on
def power_cycle(self):
logging.info('power cycle, using power cycle off press duration: '
'{}, wait duration: {}, on press duration {}'
.format(self.power_cycle_off_press_duration,
self.power_cycle_wait_duration,
self.power_cycle_on_press_duration))
return asyncbmc.wait_for_sync(self.async_power_cycle(), loop=self.loop)
# directive 3
async def press_power_reset(self, press_duration):
if self.reset_button is not None:
await self.reset_button.press(press_duration)
else:
logging.warning('unable to reset due to no reset_button set')
# power_cycle
await self.async_power_cycle()
async def async_power_reset(self):
powerstate = await self.press_power_reset(self.power_reset_press_duration)
assert powerstate == 1 # on
def power_reset(self):
logging.info('power reset, using power reset press duration: {}'
.format(self.power_reset_press_duration))
return asyncbmc.wait_for_sync(self.async_power_reset(), loop=self.loop)
# directive 5
async def press_power_shutdown(self, press_duration, wait_duration):
# should attempt a clean shutdown
powerstate = await self.async_get_power_state()
if powerstate == 1:
await self.press_power_off(press_duration)
# polite wait
await asyncio.sleep(wait_duration, loop=self.loop)
powerstate = await self.async_get_power_state()
else:
logging.info("already powered off")
return powerstate
async def async_power_shutdown(self):
powerstate = await self.press_power_shutdown(self.power_shutdown_press_duration,
self.power_shutdown_wait_duration)
assert powerstate == 0 # off
def power_shutdown(self):
logging.info('politely shut down the system, '
'using power shutdown press duration: '
'{} and polite wait duration: {}'
.format(self.power_shutdown_press_duration,
self.power_shutdown_wait_duration))
return asyncbmc.wait_for_sync(self.async_power_shutdown(), loop=self.loop)
def main():
parser = argparse.ArgumentParser(
prog='buttonbmc',
description='Generic Baseboard Management Controller with Button(s)',
conflict_handler='resolve'
)
parser.add_argument('--port',
dest='port',
type=int,
default=623,
help='Port to listen on; defaults to 623')
args = parser.parse_args()
mybmc = ButtonBmc({}, {}, port=args.port)
mybmc.listen()
if __name__ == '__main__':
sys.exit(main())