-
Notifications
You must be signed in to change notification settings - Fork 0
/
operator.py
208 lines (187 loc) · 8.44 KB
/
operator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import argparse
import paho.mqtt.client as mqtt
import configparser
import os
import json
import time
from threading import Lock
import color_log
class Operator:
def __init__(self,
broker: str,
port: int,
command_topic: str,
response_topic: str,
registration_topic: str,
ack_topic: str,
command_loader_topic: str,
registration_timeout: int,
pipelines: dict,
pipeline_mode: bool,
realtime_mode: bool,
jsonify: bool,
colorlog: bool,
save_feedback: bool,
feedback_file: str,
receive_commands: bool,
*args, **kwargs):
super().__init__(*args, **kwargs)
self._broker = broker
self._port = port
self._command_topic = command_topic
self._response_topic = response_topic
self._registration_topic = registration_topic
self._ack_topic = ack_topic
self._command_loader_topic = command_loader_topic
self._registration_timeout = registration_timeout
self._pipelines = pipelines
self._pipeline_mode = pipeline_mode
self._realtime_mode = realtime_mode
self._jsonify = jsonify
self._colorlog = colorlog
self._save_feedback = save_feedback
self._feedback_file = feedback_file
self._receive_commands = receive_commands
color_log.enable_color_logging(self._colorlog)
self._clients = set()
self._client = mqtt.Client()
self._client.on_connect = self.on_connect
self._client.on_message = self.on_message
self._lock = Lock()
def on_connect(self, client, userdata, flags, rc):
color_log.log_info(f"Connected with result code {rc}")
self._client.subscribe(self._registration_topic)
self._client.subscribe(self._response_topic)
if self._receive_commands:
self._client.subscribe(self._command_loader_topic)
def on_message(self, client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode()
if topic == self._registration_topic:
with self._lock:
if payload not in self._clients:
self._clients.add(payload)
color_log.log_info(f"Registered client: {payload}")
self._client.publish(self._ack_topic, payload)
elif topic == self._response_topic:
color_log.log_info(f"Received feedback:\n{payload}")
if self._save_feedback:
self.save_feedback_to_file(payload)
elif topic == self._command_loader_topic:
self.handle_command_loader(payload)
def handle_command_loader(self, payload):
try:
command_data = json.loads(payload)
client_id = command_data.get('client_id')
command = command_data.get('command')
if client_id and command:
if client_id.lower() == 'all':
self.send_command_to_all_clients(command)
else:
self.send_command_to_client(client_id, command)
else:
color_log.log_error("Invalid command format")
except json.JSONDecodeError as e:
color_log.log_error(f"Failed to decode JSON: {e}")
def send_command_to_all_clients(self, command):
with self._lock:
for client_id in self._clients:
self.send_command_to_client(client_id, command)
def send_command_to_client(self, client_id, command):
color_log.log_warning(f"Published command to {client_id}: {command}")
if self._jsonify:
message = json.dumps({"client_id": client_id, "command": command})
else:
message = f"{client_id}|{command}"
self._client.publish(self._command_topic, message)
def save_feedback_to_file(self, feedback: str):
with open(self._feedback_file, 'a') as f:
f.write(feedback + '\n')
def run(self):
self._client.connect(self._broker, self._port, keepalive=60)
self._client.loop_start()
last_registration_time = time.time()
color_log.log_info("Waiting for clients to register...")
while True:
with self._lock:
if self._clients:
if time.time() - last_registration_time > self._registration_timeout:
break
else:
last_registration_time = time.time()
color_log.log_info(f"Registered clients: {', '.join(self._clients)}")
if self._pipeline_mode:
self.run_pipelines()
if self._realtime_mode:
self.run_realtime_mode()
input("Press Enter to exit...\n")
self._client.loop_stop()
self._client.disconnect()
def run_pipelines(self):
color_log.log_info("Running pipelines...")
with self._lock:
for pipeline_name, pipeline_commands in self._pipelines.items():
for client_id in self._clients:
for command in pipeline_commands.split(';'):
command_message = command.strip()
if self._jsonify:
message = json.dumps({"client_id": client_id, "command": command_message})
else:
message = f"{client_id}|{command_message}"
self._client.publish(self._command_topic, message)
color_log.log_warning(f"Published command to {client_id}: {command_message}")
time.sleep(1) # Add a delay to ensure commands are processed sequentially
def run_realtime_mode(self):
color_log.log_info("Entering real-time command mode...")
while True:
command = input("Enter command to send to all clients (or 'exit' to quit): \n")
if command.lower() == 'exit':
break
with self._lock:
for client_id in self._clients:
command_message = command.strip()
if self._jsonify:
message = json.dumps({"client_id": client_id, "command": command_message})
else:
message = f"{client_id}|{command_message}"
self._client.publish(self._command_topic, message)
color_log.log_warning(f"Published command to {client_id}: {command_message}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Operator for managing commands and clients.")
parser.add_argument('--config', type=str, default='config.ini', help='Path to the configuration file.')
args = parser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
broker = os.getenv('MQTT_BROKER') or config['mqtt']['broker']
port = int(config['mqtt']['port'])
command_topic = config['mqtt']['command_topic']
response_topic = config['mqtt']['response_topic']
registration_topic = config['mqtt']['registration_topic']
ack_topic = config['mqtt']['ack_topic']
command_loader_topic = config['mqtt']['command_loader_topic']
registration_timeout = int(config['operator']['registration_timeout'])
pipeline_mode = config.getboolean('operator', 'enable_pipeline_mode')
realtime_mode = config.getboolean('operator', 'enable_realtime_mode')
jsonify = config.getboolean('operator', 'jsonify')
colorlog = config.getboolean('operator', 'colorlog')
save_feedback = config.getboolean('operator', 'save_feedback')
feedback_file = config['operator']['feedback_file']
receive_commands = config.getboolean('operator', 'receive_commands')
pipelines = {k: v for k, v in config['operator'].items() if k.startswith('pipeline')}
operator = Operator(broker,
port,
command_topic,
response_topic,
registration_topic,
ack_topic,
command_loader_topic,
registration_timeout,
pipelines,
pipeline_mode,
realtime_mode,
jsonify,
colorlog,
save_feedback,
feedback_file,
receive_commands)
operator.run()