-
Notifications
You must be signed in to change notification settings - Fork 7
/
processhub_cmd.py
213 lines (191 loc) · 7.31 KB
/
processhub_cmd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import json
import psutil
import redis
import typer
from snapshotter.init_rabbitmq import create_rabbitmq_conn
from snapshotter.init_rabbitmq import processhub_command_publish
from snapshotter.process_hub_core import PROC_STR_ID_TO_CLASS_MAP
from snapshotter.settings.config import settings
from snapshotter.utils.models.message_models import ProcessHubCommand
from snapshotter.utils.redis.redis_conn import REDIS_CONN_CONF
app = typer.Typer()
def process_up(pid):
"""
Is the process up?
:return: True if process is up
"""
p_ = psutil.Process(pid)
return p_.is_running()
# try:
# return os.waitpid(pid, os.WNOHANG) is not None
# except ChildProcessError: # no child processes
# return False
# try:
# call = subprocess.check_output("pidof '{}'".format(self.processName), shell=True)
# return True
# except subprocess.CalledProcessError:
# return False
@app.command()
def processReport():
"""
This function retrieves process details from Redis cache and prints their running status.
It prints the running status of System Event Detector, Processor Distributor, and Worker Processes.
"""
connection_pool = redis.BlockingConnectionPool(**REDIS_CONN_CONF)
redis_conn = redis.Redis(connection_pool=connection_pool)
map_raw = redis_conn.hgetall(
name=f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes',
)
event_det_pid = map_raw[b'SystemEventDetector']
print('\n' + '=' * 20 + 'System Event Detector' + '=' * 20)
try:
event_det_pid = int(event_det_pid)
except ValueError:
print('Event detector pid found in process map not a PID: ', event_det_pid)
else:
# event_detector_proc = psutil.Process(event_det_pid)
print('Event detector process running status: ', process_up(event_det_pid))
print('\n' + '=' * 20 + 'Worker Processor Distributor' + '=' * 20)
proc_dist_pid = map_raw[b'ProcessorDistributor']
try:
proc_dist_pid = int(proc_dist_pid)
except ValueError:
print('Processor distributor pid found in process map not a PID: ', proc_dist_pid)
else:
# proc_dist_proc = psutil.Process(proc_dist_pid)
print('Processor distributor process running status: ', process_up(proc_dist_pid))
print('\n' + '=' * 20 + 'Worker Processes' + '=' * 20)
cb_worker_map = map_raw[b'callback_workers']
try:
cb_worker_map = json.loads(cb_worker_map)
except json.JSONDecodeError:
print('Callback worker entries in cache corrupted...', cb_worker_map)
return
for worker_type, worker_details in cb_worker_map.items():
section_name = worker_type.capitalize()
print('\n' + '*' * 10 + section_name + '*' * 10)
if not worker_details or not isinstance(worker_details, dict):
print(f'No {section_name} workers found in process map: ', worker_details)
continue
for short_id, worker_details in worker_details.items():
print('\n' + '-' * 5 + short_id + '-' * 5)
proc_pid = worker_details['pid']
try:
proc_pid = int(proc_pid)
except ValueError:
print(f'Process name {worker_details["id"]} pid found in process map not a PID: ', proc_pid)
else:
# proc = psutil.Process(proc_pid)
print('Process name ' + worker_details['id'] + ' running status: ', process_up(proc_pid))
# https://typer.tiangolo.com/tutorial/commands/context/#configuring-the-context
@app.command(
context_settings={'allow_extra_args': True, 'ignore_unknown_options': True},
)
def start(ctx: typer.Context, process_str_id: str):
"""
Starts a process with the given process_str_id by sending a command to ProcessHubCore through RabbitMQ.
Args:
- ctx: typer.Context object
- process_str_id: str, the identifier of the process to be started
Returns:
- None
"""
if process_str_id not in PROC_STR_ID_TO_CLASS_MAP.keys():
typer.secho(
'Unknown Process identifier supplied. Check list with listProcesses command',
err=True,
fg=typer.colors.RED,
)
return
kwargs = dict()
typer.secho('Creating RabbitMQ connection...', fg=typer.colors.GREEN)
c = create_rabbitmq_conn()
typer.secho('Opening RabbitMQ channel...', fg=typer.colors.GREEN)
ch = c.channel()
proc_hub_cmd = ProcessHubCommand(
command='start',
proc_str_id=process_str_id,
init_kwargs=kwargs,
)
processhub_command_publish(ch, proc_hub_cmd.json())
typer.secho(
f'Sent command to ProcessHubCore to launch process {process_str_id} | Command: {proc_hub_cmd.json()}',
fg=typer.colors.YELLOW,
)
@app.command()
def stop(
process_str_id: str = typer.Argument(...),
pid: bool = typer.Option(
False,
help='Using this flag allows you to pass a process ID instead of the name',
),
):
"""
Stop a process by sending a command to ProcessHubCore.
Args:
process_str_id (str): The identifier of the process to stop.
pid (bool): If True, process_str_id is interpreted as a process ID instead of a name.
Returns:
None
"""
if not pid:
if (
process_str_id not in PROC_STR_ID_TO_CLASS_MAP.keys() and
process_str_id != 'self'
):
typer.secho(
'Unknown Process identifier supplied. Check list with listProcesses command',
err=True,
fg=typer.colors.RED,
)
return
else:
typer.secho(
'Creating RabbitMQ connection...',
fg=typer.colors.GREEN,
)
c = create_rabbitmq_conn()
typer.secho('Opening RabbitMQ channel...', fg=typer.colors.GREEN)
ch = c.channel()
proc_hub_cmd = ProcessHubCommand(
command='stop',
proc_str_id=process_str_id,
)
processhub_command_publish(ch, proc_hub_cmd.json())
typer.secho(
f'Sent command to ProcessHubCore to stop process {process_str_id} | Command: {proc_hub_cmd.json()}',
fg=typer.colors.YELLOW,
)
else:
process_str_id = int(process_str_id)
typer.secho('Creating RabbitMQ connection...', fg=typer.colors.GREEN)
c = create_rabbitmq_conn()
typer.secho('Opening RabbitMQ channel...', fg=typer.colors.GREEN)
ch = c.channel()
proc_hub_cmd = ProcessHubCommand(
command='stop',
pid=process_str_id,
)
processhub_command_publish(ch, proc_hub_cmd.json())
typer.secho(
f'Sent command to ProcessHubCore to stop process PID {process_str_id}',
fg=typer.colors.YELLOW,
)
@app.command()
def respawn():
"""
Sends a 'respawn' command to the ProcessHubCore via RabbitMQ.
"""
c = create_rabbitmq_conn()
typer.secho('Opening RabbitMQ channel...', fg=typer.colors.GREEN)
ch = c.channel()
proc_hub_cmd = ProcessHubCommand(
command='respawn',
)
processhub_command_publish(ch, proc_hub_cmd.json())
typer.secho(
f'Sent command to ProcessHubCore | Command: {proc_hub_cmd.json()}',
fg=typer.colors.YELLOW,
)
if __name__ == '__main__':
app()