generated from DiamondLightSource/bookshelf-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
daqmessenger.py
93 lines (71 loc) · 2.68 KB
/
daqmessenger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import json
import time
from collections import deque
import stomp
from stomp.exception import ConnectFailedException
class DaqScanListener(stomp.ConnectionListener):
def __init__(self):
self.queue = deque()
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
m = json.loads(frame.body)
self.queue.append(m)
class DaqScanListener4(stomp.ConnectionListener):
def __init__(self):
self.queue = deque()
def on_error(self, headers, message):
logger.error('Received an error "%s"' % message)
def on_message(self, headers, message):
m = json.loads(message)
self.queue.append(m)
class DaqMessenger:
def __init__(self, beamline):
self.beamline = beamline
self.old_stomp = stomp.__version__[0] == 4
def connect(self):
self.conn = stomp.Connection(
[(self.beamline, 61613)], auto_content_length=False
)
if self.old_stomp:
self.conn.start()
self.conn.connect()
def disconnect(self):
self.conn.disconnect()
def on_scan(self, message_function, sleep=1):
dsl = DaqScanListener4() if self.old_stomp else DaqScanListener()
self.conn.set_listener("scan", dsl)
self.conn.subscribe(
destination="/topic/gda.messages.scan", id=1, ack="auto")
while 1:
while dsl.queue:
m = dsl.queue.popleft()
message_function(m)
time.sleep(sleep)
def send_file(self, path):
message = json.dumps({"filePath": path})
destination = "/topic/org.dawnsci.file.topic"
self._send_message(destination, message)
def send_start(self, path):
message = json.dumps(
{"filePath": path, "status": "STARTED", "swmrStatus": "ENABLED"}
)
destination = "/topic/gda.messages.processing"
self._send_message(destination, message)
def send_update(self, path):
message = json.dumps(
{"filePath": path, "status": "UPDATED", "swmrStatus": "ACTIVE"}
)
destination = "/topic/gda.messages.processing"
self._send_message(destination, message)
def send_finished(self, path):
message = json.dumps(
{"filePath": path, "status": "FINISHED", "swmrStatus": "ACTIVE"}
)
destination = "/topic/gda.messages.processing"
self._send_message(destination, message)
def _send_message(self, destination, message):
if self.old_stomp:
self.conn.send(destination, message, ack="auto")
else:
self.conn.send(destination=destination, body=message, ack="auto")