-
Notifications
You must be signed in to change notification settings - Fork 1
/
scheduler.py
380 lines (325 loc) · 15.4 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
#!/bin/env python3
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.job import Job
from apscheduler import events
import time
from datetime import datetime
import os
import data_handler
import source.device_manager.experiment as experiment
from source.device_manager.experiment import ExperimentStatus
from source.device_manager.device_manager import DeviceManager
from source.device_manager.script import Script, get_user_script
from source.device_manager.device import get_device_info
from source.device_manager.data_directories import TEMP_DIRECTORY
import redis
import msgpack
from dataclasses import dataclass, asdict
from enum import IntEnum
import queue
from collections import deque
from typing import Optional
import docker
import docker_helper
from threading import Thread
import json
EXPERIMENT_LOG_BUFFER_LENGTH: int = 100
@dataclass
class ExperimentState:
experiment_id: int
name: str
job_id: int
container_id: str
start_time: int
end_time: int
status: ExperimentStatus
logs: deque
class ProcessStatusEventType(IntEnum):
STARTED = 0
FINISHED_SUCCESSFUL = 1
FINISHED_MANUALLY = 2
ERROR = 3
@dataclass
class ProcessStatusEvent:
experiment_id: int
event_type: ProcessStatusEventType
message: Optional[str] = None
event_queue = queue.SimpleQueue()
process_status_queue = queue.SimpleQueue()
experiments = {}
job_to_experiment = {}
experiment_id_to_data_handler_jobs = {}
scheduler = BackgroundScheduler()
redis_connection = redis.Redis(host='localhost')
pubsub = redis_connection.pubsub()
def change_experiment_status(experiment_id: int, status: ExperimentStatus):
experiments[experiment_id].status = status
redis_connection.publish(
'experiment_status',
msgpack.packb({
'experimentId': experiment_id,
'status': status
}))
def forward_experiment_log(experiment_id: int, logging_message: str):
"""
The logging messages from the experiment docker container are put into a queue buffer which is forwarded
to the frontend by a websocket
:param experiment_id: The internally assigned id of the experiment
:type experiment_id: int
:param logging_message: The latest logging message of the most recent flush of the docker stderr
:type logging_message: str
"""
# Todo: Parse the docker python string into a dict and transfer dict with levelname, timestamp,leg message ...
# experiments[experiment_id].logs.append(logging_message)
if len(experiments[experiment_id].logs) < EXPERIMENT_LOG_BUFFER_LENGTH:
experiments[experiment_id].logs.append(logging_message)
elif len(experiments[experiment_id].logs) >= EXPERIMENT_LOG_BUFFER_LENGTH:
experiments[experiment_id].logs.popleft()
experiments[experiment_id].logs.append(logging_message)
else:
for _ in range(int(EXPERIMENT_LOG_BUFFER_LENGTH/2)):
experiments[experiment_id].logs.popleft()
# print(f'Printing log list {experiments[experiment_id].logs}')
redis_connection.publish(
'experiment_logs',
msgpack.packb({
'experimentId': experiment_id,
'logList': list(experiments[experiment_id].logs)
}))
def start_data_handling_for_experiment(exp: experiment.Experiment):
device_manager = DeviceManager()
commands_to_call = {}
properties_to_call = {}
for device_booking in exp.deviceBookings:
device_uuid = device_booking.device
features = device_manager.get_features_for_data_handler(device_uuid)
for feature in features:
for command in feature.commands:
if command.active:
if command.meta:
interval_to_use = command.polling_interval_meta
else:
interval_to_use = command.polling_interval_non_meta
if (interval_to_use, device_booking.end) in commands_to_call.keys():
if device_uuid in commands_to_call[(interval_to_use,
device_booking.end)].keys():
commands_to_call[(interval_to_use,
device_booking.end)][device_uuid].append((command, feature))
else:
commands_to_call[(interval_to_use,
device_booking.end)][device_uuid] = [(command, feature)]
else:
commands_to_call[(interval_to_use,
device_booking.end)] = {device_uuid: [(command, feature)]}
for property in feature.properties:
if property.active:
if property.meta:
interval_to_use = property.polling_interval_meta
else:
interval_to_use = property.polling_interval_non_meta
if (interval_to_use, device_booking.end) in properties_to_call.keys():
if device_uuid in properties_to_call[(interval_to_use,
device_booking.end)].keys():
properties_to_call[(interval_to_use,
device_booking.end)][device_uuid].append((property, feature))
else:
properties_to_call[(interval_to_use,
device_booking.end)][device_uuid] = [(property, feature)]
else:
properties_to_call[(interval_to_use,
device_booking.end)] = {device_uuid: [(property, feature)]}
jobs = data_handler.create_jobs(commands_to_call, properties_to_call)
if exp.id in experiment_id_to_data_handler_jobs:
experiment_id_to_data_handler_jobs[exp.id] = experiment_id_to_data_handler_jobs[exp.id] + jobs
else:
experiment_id_to_data_handler_jobs[exp.id] = jobs
def print_container_output(container, experiment_id):
# container_output = container.attach(logs=False, stream=True)
container_output = container.logs(follow=True, timestamps=True, stream=True, stdout=True, stderr=True)
print('Output thread started!')
log_path = os.path.join(TEMP_DIRECTORY, 'container', f'{datetime.now().strftime("%d_%m_%Y-%H_%M_%S")}_{str(experiment_id)}.log')
with open(log_path, "w") as file:
for line in container_output:
print(line.decode())
file.write(line.decode())
# json.dump(line.decode(), file)
forward_experiment_log(experiment_id=experiment_id, logging_message=line.decode())
def wait_until_container_stops(container, experiment_id: int,
status_queue: queue.SimpleQueue):
status = container.wait()
# print(container.logs())
print(f'container stopped with StatusCode {status["StatusCode"]}')
if status['StatusCode'] == 0:
status_queue.put(
ProcessStatusEvent(experiment_id,
ProcessStatusEventType.FINISHED_SUCCESSFUL))
else:
status_queue.put(
ProcessStatusEvent(experiment_id, ProcessStatusEventType.ERROR))
container.remove()
def start_experiment(experiment_id: int, status_queue: queue.SimpleQueue):
exp = experiment.get_experiment(experiment_id)
script = get_user_script(exp.scriptID)
devices = [
asdict(get_device_info(booking.device)) for booking in exp.deviceBookings
]
start_data_handling_for_experiment(exp)
client = docker.from_env()
container = docker_helper.create_script_container(client, exp.name,
script.data,
f'devices={devices}')
print(f'Created docker container for experiment {experiment_id}: \"{container.name}\"')
output_thread = Thread(target=print_container_output, args=(container,experiment_id, ), daemon=True)
wait_thread = Thread(target=wait_until_container_stops,
args=(container, experiment_id, status_queue))
# Starting threads
container.start()
output_thread.start()
wait_thread.start()
status_queue.put(
ProcessStatusEvent(experiment_id, ProcessStatusEventType.STARTED,
container.id))
return
def handle_process_status_events(event: ProcessStatusEvent):
if event.event_type == ProcessStatusEventType.STARTED:
experiments[event.experiment_id].container_id = event.message
change_experiment_status(event.experiment_id, ExperimentStatus.RUNNING)
print(f'{experiments[event.experiment_id].name} started')
elif event.event_type == ProcessStatusEventType.FINISHED_SUCCESSFUL:
change_experiment_status(event.experiment_id,
ExperimentStatus.FINISHED_SUCCESSFUL)
print(f'{experiments[event.experiment_id].name} finished successful')
elif event.event_type == ProcessStatusEventType.ERROR:
change_experiment_status(event.experiment_id,
ExperimentStatus.FINISHED_ERROR)
print(f'{experiments[event.experiment_id].name} Error')
def event_listener(event):
event_queue.put(event)
def handle_scheduling_events(event):
if event.code == events.EVENT_JOB_SUBMITTED:
if event.job_id in job_to_experiment:
experiment_id = job_to_experiment[event.job_id]
experiment_entry = experiments[experiment_id]
change_experiment_status(experiment_id, ExperimentStatus.SUBMITED_FOR_EXECUTION)
print(f'{experiment_entry.name} submitted')
elif event.code == events.EVENT_JOB_REMOVED:
if event.job_id in job_to_experiment:
experiment_entry = experiments[job_to_experiment[event.job_id]]
print(f'{experiment_entry.name} removed')
elif event.code == events.EVENT_JOB_ERROR:
if event.job_id in job_to_experiment:
experiment_id = job_to_experiment[event.job_id]
experiment_entry = experiments[experiment_id]
change_experiment_status(experiment_id,
ExperimentStatus.FINISHED_ERROR)
print('scheduling error')
elif event.code == events.EVENT_JOB_MISSED:
if event.job_id in job_to_experiment:
experiment_id = job_to_experiment[event.job_id]
experiment_entry = experiments[experiment_id]
change_experiment_status(experiment_id,
ExperimentStatus.FINISHED_ERROR)
print('experiment missed')
def schedule_future_experiments_from_database():
for exp in experiment.get_scheduling_info():
schedule_experiment(exp)
def schedule_experiment(exp: experiment.SchedulingInfo):
if (exp.id not in experiments) or (
experiments[exp.id].status == ExperimentStatus.FINISHED_ERROR or
experiments[exp.id].status == ExperimentStatus.FINISHED_SUCCESSFUL
or experiments[exp.id].status
== ExperimentStatus.FINISHED_MANUALLY):
job = scheduler.add_job(start_experiment,
'date',
args=[exp.id, process_status_queue],
name=f'experiment:{exp.name}',
run_date=datetime.fromtimestamp(exp.start))
experiments[exp.id] = ExperimentState(
exp.id, exp.name, job.id, '0', exp.start, exp.end,
ExperimentStatus.WAITING_FOR_EXECUTION, deque(['Starting experiment..\n', 'Connection established.\n'],
maxlen=EXPERIMENT_LOG_BUFFER_LENGTH))
job_to_experiment[job.id] = exp.id
change_experiment_status(exp.id,
ExperimentStatus.WAITING_FOR_EXECUTION)
def schedule_experiment_now(exp: experiment.SchedulingInfo):
if (exp.id in experiments) and (
experiments[exp.id].status != ExperimentStatus.FINISHED_ERROR or
experiments[exp.id].status != ExperimentStatus.FINISHED_SUCCESSFUL
or
experiments[exp.id].status != ExperimentStatus.FINISHED_MANUALLY):
stop_experiment(exp.id)
job = scheduler.add_job(start_experiment,
args=[exp.id, process_status_queue],
name=f'experiment:{exp.name}')
experiments[exp.id] = ExperimentState(
exp.id, exp.name, job.id, '0', exp.start, exp.end,
ExperimentStatus.WAITING_FOR_EXECUTION, deque(['Starting experiment now!\n', 'Connection established.\n'],
maxlen=EXPERIMENT_LOG_BUFFER_LENGTH))
job_to_experiment[job.id] = exp.id
change_experiment_status(exp.id, ExperimentStatus.WAITING_FOR_EXECUTION)
def stop_experiment(experiment_id):
# Stop data handling jobs for the experiment
if experiment_id in experiment_id_to_data_handler_jobs:
for job in experiment_id_to_data_handler_jobs[experiment_id]:
job.remove()
del experiment_id_to_data_handler_jobs[experiment_id]
if experiment_id in experiments:
experiment_entry = experiments[experiment_id]
if experiment_entry.status == ExperimentStatus.WAITING_FOR_EXECUTION:
scheduler.remove_job(experiment_entry.job_id)
change_experiment_status(experiment_id,
ExperimentStatus.FINISHED_MANUALLY)
elif (experiment_entry.status ==
ExperimentStatus.SUBMITED_FOR_EXECUTION) or (
experiment_entry.status == ExperimentStatus.RUNNING):
try:
client = docker.from_env()
print(experiment_entry.container_id)
container = client.containers.get(
experiment_entry.container_id)
container.stop(timeout=1)
change_experiment_status(experiment_id,
ExperimentStatus.FINISHED_MANUALLY)
except Exception:
print("could not stop container")
def get_experiment_status(experiment_id):
if experiment_id not in experiments:
return ExperimentStatus.UNKNOWN
return experiments[experiment_id].status
def receive_and_execute_commands():
message = pubsub.get_message()
if (message is None) or (message['type'] != 'message'):
return
data = msgpack.unpackb(message['data'], raw=False)
command = data['command']
params = data['params']
if command == 'start':
print('schedule experiment now')
exp = experiment.get_experiment(params[0])
schedule_experiment_now(exp)
elif command == 'stop':
print('stop experiment')
stop_experiment(params[0])
def main():
pubsub.subscribe('scheduler')
scheduler.add_listener(event_listener, events.EVENT_ALL)
scheduler.start()
schedule_future_experiments_from_database()
t = time.time()
while True:
try:
handle_scheduling_events(event_queue.get_nowait())
except queue.Empty:
pass
try:
handle_process_status_events(process_status_queue.get_nowait())
except queue.Empty:
pass
receive_and_execute_commands()
t2 = time.time()
if t2 - t >= 5:
t = t2
schedule_future_experiments_from_database()
scheduler.shutdown()
if __name__ == '__main__':
main()